streamnative / pulsar-flink

Elastic data processing with Apache Pulsar and Apache Flink
Apache License 2.0
278 stars 119 forks source link

[BUG] Exception when running Flink SQL Client #536

Closed mhaseebmlk closed 2 years ago

mhaseebmlk commented 2 years ago

Describe the bug Every time I run the basic query SELECT * FROM <_topic-name_>; I get the following exception java.lang.NoClassDefFoundError: com/google/protobuf/Descriptors$FieldDescriptor$JavaType. See below for complete trace:

Flink SQL> use catalog pulsarcatalog;
[INFO] Execute statement succeed.

Flink SQL> use `public/default`;
[INFO] Execute statement succeed.

Flink SQL> select * from `test-topic-40`;

Exception in thread "main" org.apache.flink.table.client.SqlClientException: Unexpected exception. This is a bug. Please consider filing an issue.
    at org.apache.flink.table.client.SqlClient.startClient(SqlClient.java:201)
    at org.apache.flink.table.client.SqlClient.main(SqlClient.java:161)
Caused by: java.lang.NoClassDefFoundError: com/google/protobuf/Descriptors$FieldDescriptor$JavaType
    at org.apache.flink.streaming.connectors.pulsar.internal.SimpleSchemaTranslator$1.<clinit>(SimpleSchemaTranslator.java:359)
    at org.apache.flink.streaming.connectors.pulsar.internal.SimpleSchemaTranslator.schemaInfo2SqlType(SimpleSchemaTranslator.java:298)
    at org.apache.flink.streaming.connectors.pulsar.internal.SimpleSchemaTranslator.pulsarSchemaToFieldsDataType(SimpleSchemaTranslator.java:257)
    at org.apache.flink.streaming.connectors.pulsar.internal.SimpleSchemaTranslator.pulsarSchemaToTableSchema(SimpleSchemaTranslator.java:246)
    at org.apache.flink.streaming.connectors.pulsar.internal.PulsarCatalogSupport.schemaToCatalogTable(PulsarCatalogSupport.java:193)
    at org.apache.flink.streaming.connectors.pulsar.internal.PulsarCatalogSupport.getTableSchema(PulsarCatalogSupport.java:96)
    at org.apache.flink.table.catalog.pulsar.PulsarCatalog.getTable(PulsarCatalog.java:168)
    at org.apache.flink.table.catalog.CatalogManager.getPermanentTable(CatalogManager.java:425)
    at org.apache.flink.table.catalog.CatalogManager.getTable(CatalogManager.java:395)
    at org.apache.flink.table.planner.catalog.DatabaseCalciteSchema.getTable(DatabaseCalciteSchema.java:73)
    at org.apache.calcite.jdbc.SimpleCalciteSchema.getImplicitTable(SimpleCalciteSchema.java:83)
    at org.apache.calcite.jdbc.CalciteSchema.getTable(CalciteSchema.java:289)
    at org.apache.calcite.sql.validate.EmptyScope.resolve_(EmptyScope.java:143)
    at org.apache.calcite.sql.validate.EmptyScope.resolveTable(EmptyScope.java:99)
    at org.apache.calcite.sql.validate.DelegatingScope.resolveTable(DelegatingScope.java:203)
    at org.apache.calcite.sql.validate.IdentifierNamespace.resolveImpl(IdentifierNamespace.java:112)
    at org.apache.calcite.sql.validate.IdentifierNamespace.validateImpl(IdentifierNamespace.java:184)
    at org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84)
    at org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:997)
    at org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:975)
    at org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3085)
    at org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3070)
    at org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelect(SqlValidatorImpl.java:3335)
    at org.apache.calcite.sql.validate.SelectNamespace.validateImpl(SelectNamespace.java:60)
    at org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84)
    at org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:997)
    at org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:975)
    at org.apache.calcite.sql.SqlSelect.validate(SqlSelect.java:232)
    at org.apache.calcite.sql.validate.SqlValidatorImpl.validateScopedExpression(SqlValidatorImpl.java:952)
    at org.apache.calcite.sql.validate.SqlValidatorImpl.validate(SqlValidatorImpl.java:704)
    at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:151)
    at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:107)
    at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:205)
    at org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:101)
    at org.apache.flink.table.client.gateway.local.LocalExecutor.lambda$parseStatement$1(LocalExecutor.java:176)
    at org.apache.flink.table.client.gateway.context.ExecutionContext.wrapClassLoader(ExecutionContext.java:90)
    at org.apache.flink.table.client.gateway.local.LocalExecutor.parseStatement(LocalExecutor.java:176)
    at org.apache.flink.table.client.cli.CliClient.parseCommand(CliClient.java:385)
    at org.apache.flink.table.client.cli.CliClient.executeStatement(CliClient.java:326)
    at org.apache.flink.table.client.cli.CliClient.executeInteractive(CliClient.java:297)
    at org.apache.flink.table.client.cli.CliClient.executeInInteractiveMode(CliClient.java:221)
    at org.apache.flink.table.client.SqlClient.openCli(SqlClient.java:151)
    at org.apache.flink.table.client.SqlClient.start(SqlClient.java:95)
    at org.apache.flink.table.client.SqlClient.startClient(SqlClient.java:187)
    ... 1 more
Caused by: java.lang.ClassNotFoundException: com.google.protobuf.Descriptors$FieldDescriptor$JavaType
    at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
    at org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:64)
    at org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:74)
    at org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:48)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
    ... 45 more
Exception in thread "AsyncHttpClient-7-1" java.lang.NoClassDefFoundError: org/apache/pulsar/shade/io/netty/buffer/PoolArena$1
    at org.apache.pulsar.shade.io.netty.buffer.PoolArena.freeChunk(PoolArena.java:247)
    at org.apache.pulsar.shade.io.netty.buffer.PoolThreadCache$MemoryRegionCache.freeEntry(PoolThreadCache.java:430)
    at org.apache.pulsar.shade.io.netty.buffer.PoolThreadCache$MemoryRegionCache.free(PoolThreadCache.java:396)
    at org.apache.pulsar.shade.io.netty.buffer.PoolThreadCache$MemoryRegionCache.free(PoolThreadCache.java:388)
    at org.apache.pulsar.shade.io.netty.buffer.PoolThreadCache.free(PoolThreadCache.java:254)
    at org.apache.pulsar.shade.io.netty.buffer.PoolThreadCache.free(PoolThreadCache.java:245)
    at org.apache.pulsar.shade.io.netty.buffer.PoolThreadCache.free(PoolThreadCache.java:218)
    at org.apache.pulsar.shade.io.netty.buffer.PooledByteBufAllocator$PoolThreadLocalCache.onRemoval(PooledByteBufAllocator.java:533)
    at org.apache.pulsar.shade.io.netty.buffer.PooledByteBufAllocator$PoolThreadLocalCache.onRemoval(PooledByteBufAllocator.java:500)
    at org.apache.pulsar.shade.io.netty.util.concurrent.FastThreadLocal.remove(FastThreadLocal.java:257)
    at org.apache.pulsar.shade.io.netty.util.concurrent.FastThreadLocal.removeAll(FastThreadLocal.java:67)
    at org.apache.pulsar.shade.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:1042)
    at org.apache.pulsar.shade.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
    at org.apache.pulsar.shade.io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ClassNotFoundException: org.apache.pulsar.shade.io.netty.buffer.PoolArena$1
    at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
    at org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:64)
    at org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:74)
    at org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:48)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
    ... 15 more
Exception in thread "AsyncHttpClient-4-1" java.lang.NoClassDefFoundError: org/apache/pulsar/shade/io/netty/util/concurrent/DefaultPromise$1
    at org.apache.pulsar.shade.io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:499)
    at org.apache.pulsar.shade.io.netty.util.concurrent.DefaultPromise.setValue0(DefaultPromise.java:616)
    at org.apache.pulsar.shade.io.netty.util.concurrent.DefaultPromise.setSuccess0(DefaultPromise.java:605)
    at org.apache.pulsar.shade.io.netty.util.concurrent.DefaultPromise.setSuccess(DefaultPromise.java:96)
    at org.apache.pulsar.shade.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:1051)
    at org.apache.pulsar.shade.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
    at org.apache.pulsar.shade.io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ClassNotFoundException: org.apache.pulsar.shade.io.netty.util.concurrent.DefaultPromise$1
    at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
    at org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:64)
    at org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:74)
    at org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:48)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
    ... 8 more

Shutting down the session...
done.

To Reproduce My setup uses the following applications and versions:

Steps to reproduce the behavior:

  1. Download Pulsar v2.8.1 from https://archive.apache.org/dist/pulsar/pulsar-2.8.1/apache-pulsar-2.8.1-bin.tar.gz
  2. Extract the binary: tar -xzf apache-pulsar-2.8.1-bin.tar.gz
  3. cd into the directory and run Pulsar in standalone mode: ./bin/pulsar standalone
  4. Start writing to a test topic called my-topic. For this I am using Pulsar's Python client as shown in https://pulsar.apache.org/docs/en/client-libraries-python/#producer-example.
  5. Download Flink v.1.13.6 binary for Scala 2.11 from https://www.apache.org/dyn/closer.lua/flink/flink-1.13.6/flink-1.13.6-bin-scala_2.11.tgz
  6. Extract the binary: tar -xzf flink-1.13.6-bin-scala_2.11.tgz
  7. Start the Flink cluster: ./bin/start-cluster.sh
  8. Download the Pulsar Flink SQL Connecter from https://search.maven.org/artifact/io.streamnative.connectors/pulsar-flink-sql-connector_2.11/1.13.1.0/jar.
  9. Start the Flink SQL Client: ./bin/sql-client.sh embedded --jar pulsar-flink-sql-connector_2.11-1.13.1.0.jar. This is using the following configuration for the sql-client-defaults.yaml file:
    
    tables: [] # empty list
    functions: [] # empty list

catalogs:

execution: planner: blink type: streaming time-characteristic: event-time periodic-watermarks-interval: 200 result-mode: table max-table-result-rows: 1000000 parallelism: 1 max-parallelism: 128 min-idle-state-retention: 0 max-idle-state-retention: 0 current-catalog: default_catalog current-database: default_database restart-strategy: type: fallback

deployment: response-timeout: 5000 gateway-address: "" gateway-port: 0


10. Once started, use the `pulsarcatalog` and the default database `public/default`: `use catalog pulsarcatalog;`, `use `public/default`;`
11. Finally, run the following simple query: `SELECT * FROM my-topic`;

**Expected behavior**
I expect this to stream the real time events created by the producer. Similar to what is shown in https://github.com/morsapaes/flink-sql-pulsar#using-pulsar-metadata.

**Additional context**
I am just trying to set up a basic use case for now and will eventually write schematized data to Pulsar. My eventual goal is to try to process that data using Flink SQL.
mhaseebmlk commented 2 years ago

cc @jianyun8023 @yjshen @syhily for visibility.

sijie commented 2 years ago

@mhaseebmlk Noted. @nlu90 @syhily Can someone take a look at this issue?

nlu90 commented 2 years ago

@mhaseebmlk The Pulsar-Flink Connector has support for protobuf_native format, so it has dependency on protobuf native. You will need to put it under the /lib directory or add when starting the sql-client. You also need to put some pulsar or avro related jars under lib.

And this is the command I used to start sql-client:

./bin/sql-client.sh embedded --jar /Users/nlu/workspace/stream-native/pulsar-flink/pulsar-flink-connector/target/pulsar-flink-connector-origin-1.13.1.0.jar  --jar /Users/nlu/.m2/repository/com/google/protobuf/protobuf-java/3.11.1/protobuf-java-3.11.1.jar

And here's an example lib dir with all dependencies:

➜  flink-1.13.2 ls lib
avro-1.10.0.jar                   flink-dist_2.11-1.13.2.jar        flink-shaded-zookeeper-3.4.14.jar flink-table_2.11-1.13.2.jar       jackson-databind-2.11.4.jar       log4j-api-2.12.1.jar              pulsar-client-admin-api-2.8.0.jar
flink-avro-1.13.2.jar             flink-json-1.13.2.jar             flink-sql-avro-1.13.2.jar         jackson-annotations-2.11.4.jar    jul-to-slf4j-1.7.12.jar           log4j-core-2.12.1.jar             pulsar-client-all-2.8.0.jar
flink-csv-1.13.2.jar              flink-protobuf-2.7.6.jar          flink-table-blink_2.11-1.13.2.jar jackson-core-2.11.4.jar           log4j-1.2-api-2.12.1.jar          log4j-slf4j-impl-2.12.1.jar       pulsar-client-api-2.8.0.jar
sijie commented 2 years ago

@mhaseebmlk does Neng's comment answer your question?

mhaseebmlk commented 2 years ago

Yes, it does. Thank you. I'll go ahead and close this issue.