streamnative / flink

Apache Flink
Apache License 2.0
9 stars 7 forks source link

[SQL Connector] Consumer cannot consume from Pulsar topics written by SQL connector with avro schema #126

Closed imaffe closed 1 year ago

imaffe commented 2 years ago

in writeToExplicitTableAndReadWithJsonSchemaUsingPulsarConsumer test case, we have a

org.apache.pulsar.client.api.PulsarClientException$IncompatibleSchemaException: Failed to subscribe whAmv with 1 partitions
{"errorMsg":"Topic does not have schema to check","reqId":2453711855417924595, "remote":"localhost/127.0.0.1:50786", "local":"/127.0.0.1:50800"}

This is because when writing the data to Pulsar topic, we use a byte schema. The pulsar consumer will check the schema compatibility issues. We need to understand if this can changed using a different configuration entry (enable schema evolution feature)

imaffe commented 2 years ago

key method: addSchemaIfIdleOrCheckCompatible in pulsar schema registry service

imaffe commented 2 years ago

Root cause might be :https://github.com/apache/pulsar/issues/17354

imaffe commented 2 years ago

When there is a producer: (!producers.isEmpty()) will be true and will enter the check cycle. I guess BYTES is considered as no schema.

imaffe commented 2 years ago

Then current behaviour makes sense. Because autoSchemaUpdate requires the topic has no data. We need to redesign the PulsarTableSerializationSchema. Maybe something similar to the source evolution.

imaffe commented 2 years ago

Either we ask pulsar to support reading bytes by using a designated schema, or we upload the schema info to Pulsar broker from Flink side

imaffe commented 2 years ago

We can ask users to upload the schema before starting sending any data in. This is a workaround.

The major challenge is to set of propoer schema info when producing to Pulsar topics from Flink sink connector. In SQL sink we don't have a POJO class and the serialization is managed by Flink formats, if we want to setup the pulsar schema correctly, we need to map a Flink serializationSchema to a Pulsar schema which is very inconvenient. For avro and raw formats it's doable, for json formats we don't know how to generate a Pulsar JSON schema from the Flink RowType.

imaffe commented 2 years ago

I'll mark this issue as pending (icebox) and will note this down in the documentation. Should also give a try when the related pulsar fixes are applied.

imaffe commented 2 years ago

Same as #128, decrease the story points