tabular-io / iceberg-kafka-connect

Apache License 2.0
203 stars 46 forks source link

Failed to deserialize data for topic FBNK_ACCOUNT_CDC to Avro: #157

Closed NhatDuy11 closed 10 months ago

NhatDuy11 commented 10 months ago

This is my iceberg sink connector configuration: { "name": "test_sink_fbnk_cdc", "config": { "iceberg.catalog.s3.endpoint": "http://########", "iceberg.catalog.client.region": "us-east-1", "iceberg.catalog.s3.path.style.access": "true", "iceberg.catalog.s3.aws.credentials.provide": "com.amazonaws.auth.DefaultAWSCredentialsProviderChain", "iceberg.catalog.s3.access-key-id": "##########", "iceberg.catalog.s3.secret-access-key": "##########", "iceberg.catalog.io-impl": "org.apache.iceberg.aws.s3.S3FileIO", "iceberg.catalog.uri": "#################", "value.converter.schema.registry.url": "http://schemaregistry.confluent.svc.cluster.local:8081", "iceberg.catalog.warehouse": "s3a://***", "key.converter.schema.registry.url": "http://schemaregistry.confluent.svc.cluster.local:8081", "name": "test_sink_fbnk_cdc", "connector.class": "io.tabular.iceberg.connect.IcebergSinkConnector", "tasks.max": "1", "key.converter": "org.apache.kafka.connect.storage.StringConverter", "value.converter": "io.confluent.connect.avro.AvroConverter", "config.action.reload": "restart", "errors.tolerance": "none", "errors.log.enable": "true", "errors.log.include.messages": "true", "topics": "FBNK_ACCOUNT_CDC", "iceberg.tables": "cnd.auto_fbnkaccount_cdc", "iceberg.tables.auto-create-enabled": "true", "iceberg.tables.schema-force-optional": "true", "iceberg.tables.evolve-schema-enabled": "true", "iceberg.catalog": "iceberg", "iceberg.control.topic": "fbnk_account_cdc" } }

Currently I am having this error: Executing stage 'KEY_CONVERTER' with class 'io.confluent.connect.avro.AvroConverter', where consumed record is {topic='FBNK_ACCOUNT_CDC', partition=0, offset=5660, timestamp=1699856363507, timestampType=CreateTime}. org.apache.kafka.connect.errors.DataException: Failed to deserialize data for topic FBNK_ACCOUNT_CDC to Avro: Caused by: org.apache.kafka.common.errors.SerializationException: Unknown magic byte!

This is a photo of the data schema in the kafka topic:

Capture

I tried converting via key.converter : "org.apache.kafka.connect.storage.StringConverter" then i get this error : Caused by: javax.security.auth.login.LoginException: java.lang.NullPointerException: invalid null input: name

Can someone help me regarding this issue?Thanks guys so much

bryanck commented 10 months ago

The error implies that the record cannot be deserialized by Kafka Connect, so that is happening before it hits the sink. How is the source producing the record?

NhatDuy11 commented 10 months ago

hi @bryanck ,this is myrecord:

cdc
bryanck commented 10 months ago

My thought is the original error is caused by the key converter, the key doesn't appear to be in Avro format based on the errors noted above. Setting the key converter to a string converter got you further along. The javax.security.auth.login.LoginException is usually caused by some Hadoop code being called. Do you have the full stack trace? Also, what environment are you running this in?

NhatDuy11 commented 10 months ago

Hi @bryanck , i am using confluent on k8s environment, Can you tell me is there any way to fix the javax.security.auth.login.LoginException error?

NhatDuy11 commented 10 months ago

I am using key.convert : String converter I immediately got an error: Caused by: javax.security.auth.login.LoginException: java.lang.NullPointerException: invalid null input: name

bryanck commented 10 months ago

Do you have a stack trace for that error? I would keep the key converter as a string converter

NhatDuy11 commented 10 months ago

@bryanck , I just configured the key converter to a string and immediately got this error: Caused by: javax.security.auth.login.LoginException: java.lang.NullPointerException: invalid null input: name ? That error was caused by me viewing the logs from the connector

NhatDuy11 commented 10 months ago

@bryanck , Currently I use confluent UI and use connector to sink records to S3 region minio

bryanck commented 10 months ago

Is that the connector throwing the error, i.e. from the Kafka Connect logs, or is it the tool you are using to view the logs throwing the error?

NhatDuy11 commented 10 months ago

@bryanck Yesterday, I tried it and everything worked fine. Today I tried running it again and got the error. I'm not sure if it's the iceberg connector or the environment I'm using.

NhatDuy11 commented 10 months ago

hi @bryanck , Now I want to add this code:UserGroupInformation.setLoginUser(UserGroupInformation.createRemoteUser("hduser")); So which java sink layer should I add? , currently I want to sink the record to the minio s3 area?

bryanck commented 10 months ago

You can try setting the HADOOP_USER_NAME env variable. You can also set that as a System property.

NhatDuy11 commented 10 months ago

hi @bryanck , I just need to configure HADOOP_USER_NAME like that, right? test

bryanck commented 10 months ago

Yes that looks right.

bryanck commented 10 months ago

This is an issue with the Hadoop config, I'm going to close this. Feel free to reopen or open a new ticket with additional info.