streamnative / pulsar-flink

Elastic data processing with Apache Pulsar and Apache Flink
Apache License 2.0
278 stars 119 forks source link

[BUG] NoClassDefFoundError and NPE when using PulsarDeserializer to deserialize messages in avro to flink row #326

Open longtengz opened 3 years ago

longtengz commented 3 years ago

Describe the bug

Basicially, I want to use PulsarDeserializer to get avro data converted to flink row. But somehow, it keeps telling me NoClassDefFoundError and NPE.

The stdout in flink web ui shows the following

Exception in thread "AsyncHttpClient-7-1" java.lang.NoClassDefFoundError: org/apache/pulsar/shade/io/netty/buffer/PoolArena$1
    at org.apache.pulsar.shade.io.netty.buffer.PoolArena.freeChunk(PoolArena.java:298)
    at org.apache.pulsar.shade.io.netty.buffer.PoolThreadCache$MemoryRegionCache.freeEntry(PoolThreadCache.java:465)
    at org.apache.pulsar.shade.io.netty.buffer.PoolThreadCache$MemoryRegionCache.free(PoolThreadCache.java:431)
    at org.apache.pulsar.shade.io.netty.buffer.PoolThreadCache$MemoryRegionCache.free(PoolThreadCache.java:423)
    at org.apache.pulsar.shade.io.netty.buffer.PoolThreadCache.free(PoolThreadCache.java:277)
    at org.apache.pulsar.shade.io.netty.buffer.PoolThreadCache.free(PoolThreadCache.java:268)
    at org.apache.pulsar.shade.io.netty.buffer.PoolThreadCache.free(PoolThreadCache.java:239)
    at org.apache.pulsar.shade.io.netty.buffer.PooledByteBufAllocator$PoolThreadLocalCache.onRemoval(PooledByteBufAllocator.java:491)
    at org.apache.pulsar.shade.io.netty.buffer.PooledByteBufAllocator$PoolThreadLocalCache.onRemoval(PooledByteBufAllocator.java:458)
    at org.apache.pulsar.shade.io.netty.util.concurrent.FastThreadLocal.remove(FastThreadLocal.java:256)
    at org.apache.pulsar.shade.io.netty.util.concurrent.FastThreadLocal.removeAll(FastThreadLocal.java:67)
    at org.apache.pulsar.shade.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:1042)
    at org.apache.pulsar.shade.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
    at org.apache.pulsar.shade.io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ClassNotFoundException: org.apache.pulsar.shade.io.netty.buffer.PoolArena$1
    at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
    at org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:64)
    at org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:74)
    at org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:48)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
    ... 15 more

which is quite confusing to me, because I checked this class org/apache/pulsar/shade/io/netty/buffer/PoolArena$1 is in the fat job jar.

As for why avro->flink row, I want to process pulsar messages with field names and field positions. Since I have tons of topics and I don't want to have tons of useless classes the whole purpose of which is to do deserialization, that's why AvroDeser is no go.

To Reproduce

It's very simple code.

JSONOptions jsonOptions = new JSONOptions(new HashMap<>(), "", "");
boolean useExtendField = true;
PulsarDeserializationSchema<Row> deserializer = new PulsarDeserializer(schemaInfo, jsonOptions, useExtendField);

FlinkPulsarSource<Row> source = new FlinkPulsarSource<>(serviceUrl, adminUrl, deserializer, props);
source.setStartFromEarliest();

DataStream<Row> stream = env.addSource(source);

stream.print();

Expected behavior

Print flink row.

Log messages

The cli stdout

 The program finished with the following exception:

org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: org.apache.flink.client.program.ProgramInvocationException: Job failed (JobID: 744d6c7ce1cb2f9e7ffc8b4a377b0ee5)
        at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:366)
        at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:219)
        at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114)
        at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:812)
        at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:246)
        at org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1054)
        at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1132)
        at org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:28)
        at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1132)
Caused by: java.util.concurrent.ExecutionException: org.apache.flink.client.program.ProgramInvocationException: Job failed (JobID: 744d6c7ce1cb2f9e7ffc8b4a377b0ee5)
        at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
        at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
        at org.apache.flink.client.program.StreamContextEnvironment.getJobExecutionResult(StreamContextEnvironment.java:123)
        at org.apache.flink.client.program.StreamContextEnvironment.execute(StreamContextEnvironment.java:80)
        at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1782)
        at org.myorg.quickstart.StreamingJob.main(StreamingJob.java:131)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:349)
        ... 8 more
Caused by: org.apache.flink.client.program.ProgramInvocationException: Job failed (JobID: 744d6c7ce1cb2f9e7ffc8b4a377b0ee5)
        at org.apache.flink.client.deployment.ClusterClientJobClientAdapter.lambda$null$6(ClusterClientJobClientAdapter.java:125)
        at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616)
        at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:591)
        at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
        at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
        at org.apache.flink.client.program.rest.RestClusterClient.lambda$pollResourceAsync$22(RestClusterClient.java:665)
        at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
        at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
        at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
        at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
        at org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$9(FutureUtils.java:394)
        at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
        at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
        at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
        at java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:575)
        at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:943)
        at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:456)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
        at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:144)
        at org.apache.flink.client.deployment.ClusterClientJobClientAdapter.lambda$null$6(ClusterClientJobClientAdapter.java:123)
        ... 19 more
Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy
        at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:118)
        at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:80)
        at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:233)
        at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:224)
        at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:215)
        at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:669)
        at org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:89)
        at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:447)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:305)
        at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:212)
        at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77)
        at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158)
        at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
        at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
        at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
        at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
        at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
        at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
        at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
        at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
        at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
        at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
        at akka.actor.ActorCell.invoke(ActorCell.scala:561)
        at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
        at akka.dispatch.Mailbox.run(Mailbox.scala:225)
        at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
        at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
        at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
        at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
        at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: org.apache.pulsar.client.api.PulsarClientException: java.util.concurrent.ExecutionException: org.apache.pulsar.client.api.PulsarClientException: java.util.concurrent.CompletionException: java.lang.NullPointerException
        at org.apache.pulsar.client.api.PulsarClientException.unwrap(PulsarClientException.java:978)
        at org.apache.pulsar.client.impl.ReaderBuilderImpl.create(ReaderBuilderImpl.java:77)
        at org.apache.flink.streaming.connectors.pulsar.internal.ReaderThread.createActualReader(ReaderThread.java:134)
        at org.apache.flink.streaming.connectors.pulsar.internal.ReaderThread.run(ReaderThread.java:96)
Caused by: java.util.concurrent.ExecutionException: org.apache.pulsar.client.api.PulsarClientException: java.util.concurrent.CompletionException: java.lang.NullPointerException
        at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
        at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
        at org.apache.pulsar.client.impl.ReaderBuilderImpl.create(ReaderBuilderImpl.java:75)
        ... 2 more
Caused by: org.apache.pulsar.client.api.PulsarClientException: java.util.concurrent.CompletionException: java.lang.NullPointerException
        at org.apache.pulsar.client.impl.ConnectionHandler.handleConnectionError(ConnectionHandler.java:88)
        at java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:884)
        at java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:866)
        at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
        at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
        at org.apache.pulsar.client.impl.BinaryProtoLookupService.lambda$null$1(BinaryProtoLookupService.java:156)
        at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
        at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
        at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
        at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
        at org.apache.pulsar.client.impl.ClientCnx.handleLookupResponse(ClientCnx.java:519)
        at org.apache.pulsar.common.protocol.PulsarDecoder.channelRead(PulsarDecoder.java:150)
        at org.apache.pulsar.shade.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
        at org.apache.pulsar.shade.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
        at org.apache.pulsar.shade.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
        at org.apache.pulsar.shade.io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:324)
        at org.apache.pulsar.shade.io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:296)
        at org.apache.pulsar.shade.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
        at org.apache.pulsar.shade.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
        at org.apache.pulsar.shade.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
        at org.apache.pulsar.shade.io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
        at org.apache.pulsar.shade.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
        at org.apache.pulsar.shade.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
        at org.apache.pulsar.shade.io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
        at org.apache.pulsar.shade.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)
        at org.apache.pulsar.shade.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:719)
        at org.apache.pulsar.shade.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:655)
        at org.apache.pulsar.shade.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:581)
        at org.apache.pulsar.shade.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493)
        at org.apache.pulsar.shade.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
        at org.apache.pulsar.shade.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
        at org.apache.pulsar.shade.io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
        at java.lang.Thread.run(Thread.java:748)
Caused by: java.util.concurrent.CompletionException: java.lang.NullPointerException
        at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273)
        at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:280)
        at java.util.concurrent.CompletableFuture.uniAccept(CompletableFuture.java:673)
        at java.util.concurrent.CompletableFuture$UniAccept.tryFire(CompletableFuture.java:646)
        ... 30 more
Caused by: java.lang.NullPointerException
        at org.apache.pulsar.client.impl.ConsumerImpl.connectionOpened(ConsumerImpl.java:870)
        at org.apache.pulsar.client.impl.ConnectionHandler.lambda$grabCnx$0(ConnectionHandler.java:73)
        at java.util.concurrent.CompletableFuture.uniAccept(CompletableFuture.java:670)
        ... 31 more

The flink taskmanager log

2021-05-13 19:11:07,170 DEBUG org.apache.pulsar.common.protocol.PulsarDecoder              [] - [localhost/127.0.0.1:6650] Received cmd LOOKUP_RESPONSE
2021-05-13 19:11:07,171 DEBUG org.apache.pulsar.client.impl.ClientCnx                      [] - Received Broker lookup response: Connect
2021-05-13 19:11:07,172 INFO  org.apache.pulsar.client.impl.ConsumerImpl                   [] - [persistent://aaa/bbb/ccc][reader-caf46c8cf0] Subscribing to topic on cnx [id: 0xb4519008, L:/127.0.0.1:57752 - R:localhost/127.0.0.1:6650], consumerId 0
2021-05-13 19:11:07,176 WARN  org.apache.pulsar.client.impl.ConnectionHandler              [] - [persistent://aaa/bbb/ccc] [reader-caf46c8cf0] Error connecting to broker: java.lang.NullPointerException
2021-05-13 19:11:07,179 WARN  org.apache.pulsar.client.impl.ConnectionHandler              [] - [persistent://aaa/bbb/ccc] [reader-caf46c8cf0] Could not get connection to broker: java.lang.NullPointerException -- Will try again in 0.1 s

Possible cause

In PulsarDeserializer, getSchema returns null

https://github.com/streamnative/pulsar-flink/blob/9094562681be84698bf8f159e0ea7c10b87048f7/pulsar-flink-connector/src/main/java/org/apache/flink/streaming/connectors/pulsar/internal/PulsarDeserializer.java#L515

which is causing ConsumerImpl.schema to be null and then this line errors out.

https://github.com/apache/pulsar/blob/1926dee082c70f4d5dcad54a94fdee57e197901a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java#L727

longtengz commented 3 years ago

I got it working with PulsarDeserializationSchema.valueOnley(new AvroRowDeserializationSchema(new String(schemaInfo.getSchema(), StandardCharsets.UTF_8))).

Now I'm not sure what's the purpose of PulsarDeserializer if what I was describing is not a bug.

jianyun8023 commented 3 years ago

@longtengz Please tell me the version of the connector you are using~ PulsarDeserializer is a class that is about to be deprecated, and it is still used sparingly in the basic types of Pulsar. Please use PulsarDeserializationSchema.valueOnley(new AvroRowDeserializationSchema(new String(schemaInfo.getSchema(), StandardCharsets.UTF_8))) to decode the avro data.

jianyun8023 commented 3 years ago

Regarding the problem of org/apache/pulsar/shade/io/netty/buffer/PoolArena$1, it should be that the same class exists in flink_lib. You can keep any copy. This class is in the pulsar-client-all package. Also exists in the fat connector jar.

syhily commented 3 years ago

@jianyun8023 I think this is a bug, we should drop the PulsarDeserializer if it's about to be deprecated.

jianyun8023 commented 3 years ago

@syhily @longtengz Well, PulsarDeserializer is the class that should be dropped