datafibers-community / df_data_service

DataFibers Data Service
http://www.datafibers.com
Apache License 2.0
31 stars 30 forks source link

Flink transform topic sink does not work #166

Open datafibers opened 7 years ago

datafibers commented 7 years ago

Once Flink transform the data into a new topic. To sink this topic into db does not work with below issues.

org.apache.kafka.connect.errors.DataException: flink_sink at io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:97) at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:407) at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:250) at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:180) at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:148) at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:146) at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:190) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.kafka.common.errors.SerializationException: Error retrieving Avro schema for id 8 Caused by: io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Subject not found.; error code: 40401 at io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:182) at io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:203) at io.confluent.kafka.schemaregistry.client.rest.RestService.lookUpSubjectVersion(RestService.java:269) at io.confluent.kafka.schemaregistry.client.rest.RestService.lookUpSubjectVersion(RestService.java:257) at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getVersionFromRegistry(CachedSchemaRegistryClient.java:71) at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getVersion(CachedSchemaRegistryClient.java:175) at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:152) at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserializeWithSchemaAndVersion(AbstractKafkaAvroDeserializer.java:194) at io.confluent.connect.avro.AvroConverter$Deserializer.deserialize(AvroConverter.java:121) at io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:85) at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:407) at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:250) at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:180) at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:148) at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:146) at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:190) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748)

datafibers commented 7 years ago

this is fixed by replicating topic-value subject. @SchubertZhu to test