I tried to run iceberg sink demo follow with pulsar-io-lakehouse sink docs. it fail commit record because getSchema result unexcepted.
Describe the bug
my test flow shows below:
create topic & produce data
first, I produce lots of data to test topic persistent://public/default/iceberg_test by Flink-connector.
message format like:
22061772,1670896138459,mmdc-bigdata-test,11.156.128.57,jobmanager,11.156.128.75,2022-12-13 09:48:58,29
and set topic-schema with bin/pulsar-admin schemas upload command.
therefore,test-topic schema show below:
Run the lakehouse sink connector
logs shows sink iceberg failed with schema exception.
there are two question:
why getSchemaType result different from these two ways:
record.getSchema().getSchemaInfo().getSchemaDefinition()=null
so records will skiped in sinkWriter.run
and i found that in getSchemaDefinition
if SchemaType=STRING/BYTES, it's SchemaDefinition will always be null cause sink failed.
Environment
Pulsar version: 2.9.3
Deployment: On-premises cluster
pulsar-io-lakehouse-connector version: 2.9.3.16
4 broker & 1 function-worker (run as a separate process in separate machines.)
I tried to run iceberg sink demo follow with pulsar-io-lakehouse sink docs. it fail commit record because
getSchema
result unexcepted.Describe the bug my test flow shows below:
persistent://public/default/iceberg_test
by Flink-connector.message format like:
22061772,1670896138459,mmdc-bigdata-test,11.156.128.57,jobmanager,11.156.128.75,2022-12-13 09:48:58,29
and set topic-schema withbin/pulsar-admin schemas upload
command. therefore,test-topic schema show below:record.getSchema().getSchemaInfo().getSchemaDefinition()=null
so records will skiped in sinkWriter.run and i found that ingetSchemaDefinition
if SchemaType=STRING/BYTES, it's SchemaDefinition will always benull
cause sink failed.Environment
4 broker & 1 function-worker (run as a separate process in separate machines.)