jaehyeon-kim / flink-demos

Apache Flink (Pyflink) and Related Projects
https://jaehyeon.me/tags/apache-flink/
29 stars 11 forks source link

IAMClientCallbackHandler could not be found #21

Open Nishtha1994 opened 4 months ago

Nishtha1994 commented 4 months ago

Hi @jaehyeon-kim , thank you for adding the series on setting up flink on aws + msk kafka access via iam.

I am using managed flink on was and trying to read from kafka msk and I am having an issue with this -

[org.apache.flink.kafka.shaded.org](http://org.apache.flink.kafka.shaded.org/).apache.commonconfig,ConfigException: Invalid value software
.amazon.msk.auth.iam.IAMClientCallbackHandler for configuration sasl.client.callback.handler.class: Class software.amazon.msk.auth.iam.IAMClientCallbackHandler could not be found

I am using this config in my connector sasl.client.callback.handler.class = software.amazon.msk.auth.iam.IAMClientCallbackHandler I have packaged my jar(s) into a uber jar, I think I ll have to add this to the classpath differently on managed flink. Any ideas? I saw you faced similar problems so I removed the flink-kafka-sql-connector jar and just added the flink-kafka-connector and it still failed.

jaehyeon-kim commented 4 months ago

Hello,

Do you use this config? https://github.com/jaehyeon-kim/flink-demos/blob/master/pyflink-getting-started-on-aws/remote/processor.py#L56

image

Nishtha1994 commented 4 months ago

Yes! I have these 4 configs in, I am using exactly the same set up. Is there a different way to add a jar to the classpath on managed flink? Apart from the Uber jar

jaehyeon-kim commented 3 months ago

Hello,

It's the only option to create a uber jar. I'm not sure what makes it to fail. Can you check those example apps?

https://github.com/aws-samples/amazon-managed-service-for-apache-flink-examples/tree/main/python

Nishtha1994 commented 3 months ago

Okay, I had to upgrade the flink version to 1.19 on aws. It’s now failing saying- IMG_8514

I been using flink kafka connector instead of flink sql kafka connector. I been using the version 1.19 for that. How do I check if there is a version mismatch?

jaehyeon-kim commented 3 months ago

The last line indicates the python process failed with an error. Have you checked if it runs fine locally?

Nishtha1994 commented 3 months ago

It does work locally, does the json work with flink-kafka-connector on aws or does it need flink-sql-kafka-connector? I am guessing there is a deserialization issue here given I can connect to my msk brokers

jaehyeon-kim commented 3 months ago

image

From memory, we don't need flink-sql-kafka-connector if we add flink-kafka-connector and kafka client dependencies. Glad that you can connect to your msk cluster.

It is weird that it works locally but not on the aws managed flink but I also had a similar issue earlier. You may try to read the messages as bytes and deserialise using a udf if you think deseralisation is an issue - https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/connectors/table/formats/raw/

Nishtha1994 commented 3 months ago

Even when I use raw type format it fails. Same with format json. This is the error I see - Caused by: java.io.IOException: Failed to deserialize consumer record ConsumerRecord(topic =test, partition = 0, leaderEpoch = 0, offset = 8, CreateTime = 123, serialized key size = -1, serialized value size = 1, headers = RecordHeaders(headers = [], isReadOnly = false), key = 34be653c , value = [B@34be653c).

guess data stream is the way to go for us since we cannot use any other custom deserialization :(