Describe the bug
When using FlinkPulsarSink connector with protobuf special mode serialization on a Pulsar topic, the schema comparison returns an error if the protobuf schema has changed. Only the first schema versioni is uploaded into the pulsar schema registry.
Here is the exception : java.lang.RuntimeException: Writing to a topic which have incompatible schema
from SchemaUtils at line 95, method schemaEqualsIgnoreProperties or compatibleSchema.
FlinkPulsarSink aggregateOutProducer = new FlinkPulsarSink(
adminUrl,
Optional.of(aggregateOutTopic),
clientConfig,
new Properties(),
serialization,
PulsarSinkSemantic.AT_LEAST_ONCE
);`
Run the flink job, it will upload the schema into Pulsar schema registry and produce some messages on the topic.
Update the protobuf message structure by adding a new field
Rebuild and rerun the Flink job with this new protobuf class version. It throws the exception : ava.lang.RuntimeException: Writing to a topic which have incompatible schema at org.apache.flink.streaming.connectors.pulsar.internal.SchemaUtils.uploadPulsarSchema(SchemaUtils.java:95) at org.apache.flink.streaming.connectors.pulsar.FlinkPulsarSinkBase.uploadSchema(FlinkPulsarSinkBase.java:302) at org.apache.flink.streaming.connectors.pulsar.FlinkPulsarSinkBase.open(FlinkPulsarSinkBase.java:267) at org.apache.flink.streaming.connectors.pulsar.FlinkPulsarSink.open(FlinkPulsarSink.java:41) at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:34) at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102) at org.apache.flink.streaming.api.operators.StreamSink.open(StreamSink.java:46) at org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:442) at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:582) at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55) at org.apache.flink.streaming.runtime.tasks.StreamTask.executeRestore(StreamTask.java:562) at org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:647) at org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:537) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:759) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566) at java.lang.Thread.run(Thread.java:748)
Expected behavior
On the second run with the new schema version, we expect that the Pulsar sink uploads the new schema version to the registry.
If we do this upload manually through pulsar admin cli before running the Flink job, the schema registry is updated and the job runs correctly.
We do not understand why the new schema are uploaded to schema regsitry only if there is no existing schema (SchemaUtils, uploadPulsarSchema()).
As far as we understand, the schema evolution compatibility strategies are managed on Pulsar server, consequently the puslar-flink connector should always try to upload the new schema, and let Pulsar server tells if it is compatible.
When using directly the Pulsar producer java client, the new schema versions are automatically uploaded to the registry, which tells if new schema is accepted or not (it depends on the schema compatibility strategy)
Is there a reason to only upload the first schema version of a topic in pulsar-flink connector, and to not try to upload automatically the new schema versions ?
We have dropped the SchemaUtils in the latest release. The schema compatible check should be moved to the Pulsar side. Can you confirm this on the master branch?
Describe the bug When using FlinkPulsarSink connector with protobuf special mode serialization on a Pulsar topic, the schema comparison returns an error if the protobuf schema has changed. Only the first schema versioni is uploaded into the pulsar schema registry.
Here is the exception :
java.lang.RuntimeException: Writing to a topic which have incompatible schema
from SchemaUtils at line 95, method schemaEqualsIgnoreProperties or compatibleSchema.Flink : 1.13.2 Pulsar 2.8.0 Pulsar-flink-connector_2.12 : 1.12.4.3
To Reproduce Steps to reproduce the behavior:
`PulsarSerializationSchema- serialization = new PulsarSerializationSchemaWrapper.Builder<>(new SerializationItemProtoSchema())
.useSpecialMode(Schema.PROTOBUF(Item.class))
.setKeyExtractor((SerializableFunction<Item,byte[]>) obj -> obj.getID().getUuID().getBytes())
.build();
FlinkPulsarSink- aggregateOutProducer = new FlinkPulsarSink(
adminUrl,
Optional.of(aggregateOutTopic),
clientConfig,
new Properties(),
serialization,
PulsarSinkSemantic.AT_LEAST_ONCE
);`
ava.lang.RuntimeException: Writing to a topic which have incompatible schema at org.apache.flink.streaming.connectors.pulsar.internal.SchemaUtils.uploadPulsarSchema(SchemaUtils.java:95) at org.apache.flink.streaming.connectors.pulsar.FlinkPulsarSinkBase.uploadSchema(FlinkPulsarSinkBase.java:302) at org.apache.flink.streaming.connectors.pulsar.FlinkPulsarSinkBase.open(FlinkPulsarSinkBase.java:267) at org.apache.flink.streaming.connectors.pulsar.FlinkPulsarSink.open(FlinkPulsarSink.java:41) at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:34) at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102) at org.apache.flink.streaming.api.operators.StreamSink.open(StreamSink.java:46) at org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:442) at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:582) at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55) at org.apache.flink.streaming.runtime.tasks.StreamTask.executeRestore(StreamTask.java:562) at org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:647) at org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:537) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:759) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566) at java.lang.Thread.run(Thread.java:748)
Expected behavior On the second run with the new schema version, we expect that the Pulsar sink uploads the new schema version to the registry.
If we do this upload manually through pulsar admin cli before running the Flink job, the schema registry is updated and the job runs correctly.
We do not understand why the new schema are uploaded to schema regsitry only if there is no existing schema (SchemaUtils, uploadPulsarSchema()). As far as we understand, the schema evolution compatibility strategies are managed on Pulsar server, consequently the puslar-flink connector should always try to upload the new schema, and let Pulsar server tells if it is compatible.
When using directly the Pulsar producer java client, the new schema versions are automatically uploaded to the registry, which tells if new schema is accepted or not (it depends on the schema compatibility strategy)
Is there a reason to only upload the first schema version of a topic in pulsar-flink connector, and to not try to upload automatically the new schema versions ?