robinhood / faust

Python Stream Processing
Other
6.72k stars 535 forks source link

Kafka connect cannot convert to avro #369

Open ankitchiplunkar opened 5 years ago

ankitchiplunkar commented 5 years ago

Checklist

Steps to reproduce

Tell us what you did to cause something to happen.

  1. faust agent writes to the topic in avro format
  2. connected faust topic to kafka connect
  3. got cannot deserialize avro error

Expected behavior

The connector should be able to work properly

Actual behavior

The connector throws a cannot deserialize error

Full traceback

[2019-06-20 13:47:07,469] ERROR WorkerSinkTask{id=faust-btc-confirmed-blocks-data -0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:177)
org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler
        at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:178)
        at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:104)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:484)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:464)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:320)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:224)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:192)
        at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
        at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
        at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
        at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
        at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: org.apache.kafka.connect.errors.DataException: Failed to deserialize data for topic faust-btc-confirmed-blocks-data to Avro:
        at io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:107)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$0(WorkerSinkTask.java:484)
        at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:128)
        at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162)
        ... 13 more
Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id -1
Caused by: org.apache.kafka.common.errors.SerializationException: Unknown magic byte!
[2019-06-20 13:47:07,471] ERROR WorkerSinkTask{id=faust-btc-confirmed-blocks-data -0} Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:178)
[2019-06-20 13:47:07,471] INFO [Consumer clientId=consumer-1, groupId=connect-faust-btc-confirmed-blocks-data] Member consumer-1-d85e54c6-9b61-4b85-b8bb-dabb9f39d51f sending LeaveGroup request to coordinator ip-172-31-0-16.ec2.internal:9092 (id: 2147483647 rack: null) (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:822)

Versions

ask commented 5 years ago

How are you writing avro in Faust?

ankitchiplunkar commented 5 years ago

We are using the FaustSerializer from python-schema-registry-client

from schema_registry.client import SchemaRegistryClient

from schema_registry.serializers import FaustSerializer

# create an instance of the `SchemaRegistryClient`
client = SchemaRegistryClient(url=SCHEMA_REGISTRY)

avro_block_serializer = FaustSerializer(client, "blocks", block_schema)

# function used to register the codec
def avro_block_codec():
    return avro_block_serializer

This serializer is then used to push in the agent

await topics.blocks_data_topic.send(key=str(block.blockNumber), value=block, value_serializer=avro_block_serializer)
OneCricketeer commented 4 years ago

It's unclear if AvroConverter is also used for your Kafka Connect key.converter, but if it is, then you would expect to see that error since your key is just a string.

Other than that, I would recommend using kafka-avro-console-consumer rather than Kafka Connect to verify the Avro deserializer from Confluent does work.