streamthoughts / kafka-connect-file-pulse

🔗 A multipurpose Kafka Connect connector that makes it easy to parse, transform and stream any file, in any format, into Apache Kafka
https://streamthoughts.github.io/kafka-connect-file-pulse/
Apache License 2.0
323 stars 65 forks source link

SerializationException: Error registering Avro schema #500

Open neeru-bhaskar opened 1 year ago

neeru-bhaskar commented 1 year ago

Hi, I am trying to create a s3 source connector with below configuration

curl \
  -i -X PUT -H "Accept:application/json" \
  -H  "Content-Type:application/json" http://localhost:8080/connectors/s3_source_connector/config \
-d '{
        "connector.class": "io.streamthoughts.kafka.connect.filepulse.source.FilePulseSourceConnector",
        "name": "s3_source_connector",
        "topic": "topicname",
        "tasks.max": 1,

        "fs.listing.class": "io.streamthoughts.kafka.connect.filepulse.fs.AmazonS3FileSystemListing",
        "fs.listing.interval.ms": 10000,
        "fs.listing.filters": "io.streamthoughts.kafka.connect.filepulse.fs.filter.RegexFileListFilter",
        "file.filter.regex.pattern": "test\\.csv",

        "fs.cleanup.policy.class": "io.streamthoughts.kafka.connect.filepulse.fs.clean.LogCleanupPolicy",

        "aws.access.key.id":"key",
        "aws.secret.access.key":"secret",
        "aws.s3.bucket.name":"bucket",
        "aws.s3.region":"us-east-1",
        "aws.secret.session.token": "session_token",
        "aws.s3.bucket.prefix": "bucket",
        "tasks.reader.class": "io.streamthoughts.kafka.connect.filepulse.fs.reader.AmazonS3RowFileInputReader",

        "skip.headers": "1",
        "offset.attributes.string": "uri",

        "tasks.file.status.storage.bootstrap.servers":"bootstrapserver",
        "tasks.file.status.storage.topic":"storage-topic",
        "tasks.file.status.storage.topic.partitions":10,
        "tasks.file.status.storage.topic.replication.factor":1,
        "tasks.file.status.storage.topic.creation.enable": false,

        "filters":"CSVFilter",
        "filters.CSVFilter.type": "io.streamthoughts.kafka.connect.filepulse.filter.CSVFilter",
        "filters.CSVFilter.extractColumnName": "headers",
        "filters.CSVFilter.trimColumn": "true",
        "filters.CSVFilter.separator": ",",

        "tasks.file.status.storage.producer.security.protocol": "SSL",
        "tasks.file.status.storage.producer.ssl.endpoint.identification.algorithm": "",
        "tasks.file.status.storage.producer.sasl.mechanism": "GSSAPI",
        "tasks.file.status.storage.producer.ssl.key.password": "",
        "tasks.file.status.storage.producer.ssl.keystore.location": "keystore.jks",
        "tasks.file.status.storage.producer.ssl.keystore.password": "password",
        "tasks.file.status.storage.producer.ssl.truststore.location": "truststore.jks",
        "tasks.file.status.storage.producer.ssl.truststore.password": "password",
        "tasks.file.status.storage.consumer.security.protocol": "SSL",
        "tasks.file.status.storage.consumer.ssl.endpoint.identification.algorithm": "",
        "tasks.file.status.storage.consumer.sasl.mechanism": "GSSAPI",
        "tasks.file.status.storage.consumer.ssl.key.password": "",
        "tasks.file.status.storage.consumer.ssl.keystore.location": "keystore.jks",
        "tasks.file.status.storage.consumer.ssl.keystore.password": "password",
        "tasks.file.status.storage.consumer.ssl.truststore.location": "truststore.jks",
        "tasks.file.status.storage.consumer.ssl.truststore.password": "password",

        "errors.log.include.messages": "true",
        "errors.log.enable": "true"

    }';

I am getting the following exception

[2023-07-13 22:49:25,231] ERROR Error encountered in task jdbc_sink_connector_s3_src_connect-0. Executing stage 'VALUE_CONVERTER' with class 'io.confluent.connect.avro.AvroConverter', where source record is = SourceRecord{sourcePartition={uri=s3://bucket-aws-useast1-apps-dev-1-dev/bucket/test.csv}, sourceOffset={position=132, rows=2, timestamp=1689288565168}} ConnectRecord{topic='topicname', kafkaPartition=null, key=null, keySchema=null, value=Struct{empName=John}, valueSchema=Schema{STRUCT}, timestamp=1689288565168, headers=ConnectHeaders(headers=[ConnectHeader(key=connect.file.name, value=bucket/test.csv, schema=Schema{STRING}), ConnectHeader(key=connect.file.uri, value=s3://bucket-aws-useast1-apps-dev-1-dev/bucket/test.csv, schema=Schema{STRING}), ConnectHeader(key=connect.file.contentLength, value=132, schema=Schema{INT64}), ConnectHeader(key=connect.file.lastModified, value=1689275469000, schema=Schema{INT64}), ConnectHeader(key=connect.file.s3.object.summary.key, value=bucket/test.csv, schema=Schema{STRING}), ConnectHeader(key=connect.file.s3.object.summary.etag, value=df96017ba0e96ddacd2de1736ade34eb, schema=Schema{STRING}), ConnectHeader(key=connect.file.s3.object.summary.bucketName, value=bucket, schema=Schema{STRING}), ConnectHeader(key=connect.task.hostname, value=kafka-connect-s3sourceconn-79cdd7466f-fnjzw, schema=Schema{STRING})])}. (org.apache.kafka.connect.runtime.errors.LogReporter)
org.apache.kafka.connect.errors.DataException: Failed to serialize Avro data from topic topicname :
    at io.confluent.connect.avro.AvroConverter.fromConnectData(AvroConverter.java:93)
    at org.apache.kafka.connect.storage.Converter.fromConnectData(Converter.java:63)
    at org.apache.kafka.connect.runtime.WorkerSourceTask.lambda$convertTransformedRecord$3(WorkerSourceTask.java:329)
    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:156)
    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:190)
    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:132)
    at org.apache.kafka.connect.runtime.WorkerSourceTask.convertTransformedRecord(WorkerSourceTask.java:329)
    at org.apache.kafka.connect.runtime.WorkerSourceTask.sendRecords(WorkerSourceTask.java:355)
    at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:257)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:188)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:243)
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: org.apache.kafka.common.errors.SerializationException: Error registering Avro schema{"type":"record","name":"ConnectDefault","namespace":"io.confluent.connect.avro","fields":[{"name":"empName","type":["null","string"],"default":null}]}
    at io.confluent.kafka.serializers.AbstractKafkaSchemaSerDe.toKafkaException(AbstractKafkaSchemaSerDe.java:259)
    at io.confluent.kafka.serializers.AbstractKafkaAvroSerializer.serializeImpl(AbstractKafkaAvroSerializer.java:156)
    at io.confluent.connect.avro.AvroConverter$Serializer.serialize(AvroConverter.java:153)
    at io.confluent.connect.avro.AvroConverter.fromConnectData(AvroConverter.java:86)
    ... 15 more
Caused by: io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Unexpected character ('<' (code 60)): expected a valid value (JSON String, Number, Array, Object or token 'null', 'true' or 'false')
 at [Source: (sun.net.www.protocol.http.HttpURLConnection$HttpInputStream); line: 1, column: 2]; error code: 50005
    at io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:297)
    at io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:367)
    at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:544)
    at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:532)
    at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:490)
    at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.registerAndGetId(CachedSchemaRegistryClient.java:257)
    at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.register(CachedSchemaRegistryClient.java:366)
    at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.register(CachedSchemaRegistryClient.java:337)
    at io.confluent.kafka.serializers.AbstractKafkaAvroSerializer.serializeImpl(AbstractKafkaAvroSerializer.java:115)
    ... 17 more

I tried to add below schema key and value converters with schema registry url and keystore and truststore to the connector config. But i am getting the same error.

        "key.converter.schemas.enable": "false",
        "key.converter": "org.apache.kafka.connect.storage.StringConverter",

        "value.converter.schema.registry.ssl.keystore.password": "password",
        "value.converter.schema.registry.ssl.truststore.password": "password",
        "value.converter.schema.registry.ssl.truststore.location": "truststore.jks",
        "value.converter.schema.registry.url": "sehema-registry-url",
        "value.converter.schema.registry.ssl.key.password": "",
        "value.converter.schema.registry.ssl.keystore.location": "keystore.jks",
        "value.converter.schemas.enable": "true",
        "value.converter": "io.confluent.connect.avro.AvroConverter",

Any suggestions on how to fix this?

ghost commented 1 year ago

Hey i think the issue is the following:

Does this help? Do you have the header in the CSV to check this?

Best Marko

github-actions[bot] commented 10 months ago

This issue has been automatically marked as stale because it has not had recent activity. It will be closed if no further activity occurs. Thank you for your contributions.