Open PiyushTaiwade opened 5 years ago
name=s3-sink
connector.class=io.confluent.connect.s3.S3SinkConnector tasks.max=1
topics=test-backup,demo-backup,ncdx.response.private,ncdx.isuser.private,ncdx.incoming.event.private,ncdx.event,ncdx-events-streamproc-kafka-confluent-KSTREAM-AGGREGATE-STATE-STORE-0000000002-repartition,io-mimiro-ncdx-streams-KSTREAM-AGGREGATE-STATE-STORE-0000000002-repartition,ncdx-events-streamproc-kafka-confluent-KSTREAM-AGGREGATE-STATE-STORE-0000000002-changelog,io-mimiro-ncdx-streams-KSTREAM-AGGREGATE-STATE-STORE-0000000002-changelog
s3.region=eu-west-1 s3.bucket.name=kafka-xxxxxx-xxxxx-xxxxx-xxxx-testing s3.part.size=5242880 s3.ssea.name=aws:kms flush.size=3
partitioner.class=io.confluent.connect.storage.partitioner.TimeBasedPartitioner
timestamp.extractor=Record timestamp.field=local_timestamp path.format=YYYY-MM-dd-HH partition.duration.ms=3600000 locale=EU timezone=UTC
value.converter=org.apache.kafka.connect.converters.ByteArrayConverter
format.class=io.confluent.connect.s3.format.bytearray.ByteArrayFormat storage.class=io.confluent.connect.s3.storage.S3Storage schema.compatibility=NONE
bootstrap.servers=192.x.xx.xx:9092,192.x.xx.xxx:9092,192.x.xx.xxx:9092
plugin.path=/opt/confluent-5.1.2/share/kafka/plugins key.converter=io.confluent.connect.avro.AvroConverter key.converter.schema.registry.url=http://localhost:8081 value.converter=io.confluent.connect.avro.AvroConverter value.converter.schema.registry.url=http://localhost:8081
key.converter.schemas.enable=false value.converter.schemas.enable=false
internal.key.converter=org.apache.kafka.connect.json.JsonConverter internal.value.converter=org.apache.kafka.connect.json.JsonConverter internal.key.converter.schemas.enable=false internal.value.converter.schemas.enable=false
offset.storage.file.filename=/opt/confluent-5.1.2/tmp/connect.offsets offset.flush.interval.ms=10000 ~
I am not able to resolve the error can anyone help
ncdx-events-streamproc-kafka-confluent-KSTREAM-AGGREGATE-STATE-STORE-0000000002-changelog
Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id -1
Caused by: org.apache.kafka.common.errors.SerializationException: Unknown magic byte!
The topic listed is not using Avro for either its key or value, so you cannot use AvroConverter
for this topic, specifically, and possibly others in your list.
You'll need to run a separate set of Connector sinks with different converter values that match the format of the data in those topics.
I don't understand what this has to do with avro. He commented out the avro formatter and is using the ByteArrayFormat
I don't understand what this has to do with avro
The stacktrace? at io.confluent.connect.avro.AvroConverter.toConnectData
As far as I can tell, key.converter
has always been using Avro, since it wasn't set in the initial post and not commented in the most recent one.
Plus, the format.class
is separate from the converter configs; that is what writes the files. Converters are only used when consuming the messages.
Tolerance exceeded exception comes in when the data you produced in Kafka is not in the desired format.
@pratik5156 Its for any retryable exception, not only serialization
That error also wraps the real exception
When I change converter as following, I can get data.
key.converter=io.confluent.connect.avro.AvroConverter
value.converter=io.confluent.connect.avro.AvroConverter
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.storage.StringConverter
If the data is JSON, then JSONConverter would be a better choice than StringConverter
ERROR WorkerSinkTask{id=s3-sink-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:177) org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:178) at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:104) at org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:511) at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:491) at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:322) at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:226) at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:194) at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175) at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.kafka.connect.errors.DataException: ncdx-events-streamproc-kafka-confluent-KSTREAM-AGGREGATE-STATE-STORE-0000000002-changelog at io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:103) at org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$0(WorkerSinkTask.java:511) at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:128) at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162) ... 13 more Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id -1 Caused by: org.apache.kafka.common.errors.SerializationException: Unknown magic byte! [2019-03-14 09:58:45,938] ERROR WorkerSinkTask{id=s3-sink-0} Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:178) [2019-03-14 09:58:45,939] INFO [Consumer clientId=consumer-1, groupId=connect-s3-sink] Sending LeaveGroup request to coordinator ip-192-0-xx-xxx.eu-west-1.compute.internal:9092 (id: 21xxxxxxxx6 rack: null) (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:782)