aws / aws-msk-iam-auth

Enables developers to use AWS Identity and Access Management (IAM) to connect to their Amazon Managed Streaming for Apache Kafka (Amazon MSK) clusters.
Apache License 2.0
137 stars 65 forks source link

Error with flink-sql-connector-kafka #129

Closed jaehyeon-kim closed 11 months ago

jaehyeon-kim commented 11 months ago

Hello, I work on connecting to an AWS MSK cluster with IAM authentication method and I failed with the following error - the full error message can be found in error.txt - error.txt

...

Caused by: org.apache.flink.kafka.shaded.org.apache.kafka.common.KafkaException: java.lang.ClassCastException: class software.amazon.msk.auth.iam.IAMClientCallbackHandler cannot be cast to class org.apache.flink.kafka.shaded.org.apache.kafka.common.security.auth.AuthenticateCallbackHandler (software.amazon.msk.auth.iam.IAMClientCallbackHandler and org.apache.flink.kafka.shaded.org.apache.kafka.common.security.auth.AuthenticateCallbackHandler are in unnamed module of loader org.apache.flink.util.ChildFirstClassLoader @7ed85b97)
        at org.apache.flink.kafka.shaded.org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:184)
        at org.apache.flink.kafka.shaded.org.apache.kafka.common.network.ChannelBuilders.create(ChannelBuilders.java:192)
        at org.apache.flink.kafka.shaded.org.apache.kafka.common.network.ChannelBuilders.clientChannelBuilder(ChannelBuilders.java:81)
        at org.apache.flink.kafka.shaded.org.apache.kafka.clients.ClientUtils.createChannelBuilder(ClientUtils.java:105)
        at org.apache.flink.kafka.shaded.org.apache.kafka.clients.admin.KafkaAdminClient.createInternal(KafkaAdminClient.java:513)
        ... 13 more
Caused by: java.lang.ClassCastException: class software.amazon.msk.auth.iam.IAMClientCallbackHandler cannot be cast to class org.apache.flink.kafka.shaded.org.apache.kafka.common.security.auth.AuthenticateCallbackHandler (software.amazon.msk.auth.iam.IAMClientCallbackHandler and org.apache.flink.kafka.shaded.org.apache.kafka.common.security.auth.AuthenticateCallbackHandler are in unnamed module of loader org.apache.flink.util.ChildFirstClassLoader @7ed85b97)
        at org.apache.flink.kafka.shaded.org.apache.kafka.common.network.SaslChannelBuilder.createClientCallbackHandler(SaslChannelBuilder.java:305)
        at org.apache.flink.kafka.shaded.org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:148)
        ... 17 more

I tried both adding multiple JAR files and a single fat jar file as I plan to deploy it to Kinesis Data Analytics later. Both raise the same error and the POM file for the fat jar can be found in pom.xml. Also The relevant app configuration can be found in src.py.

Can you please inform me how to fix this error?

src.py

### single fat jar file
PIPELINE_FAT_JAR = "pyflink-getting-started-1.jar"
table_env.get_config().set(
    "pipeline.jars", f"file://{os.path.join(CURRENT_DIR, 'lib', PIPELINE_FAT_JAR)}"
)

### multiple JAR files
CURRENT_DIR = os.path.dirname(os.path.realpath(__file__))
FLINK_SQL_CONNECTOR_KAFKA = "flink-sql-connector-kafka-1.15.2.jar"
AWS_MSK_IAM_AUTH = "aws-msk-iam-auth-1.1.7-all.jar"
KAFKA_CLIENTS = "kafka-clients-2.8.1.jar"
table_env.get_config().set(
    "pipeline.jars",
    f"file://{os.path.join(CURRENT_DIR, 'lib', FLINK_SQL_CONNECTOR_KAFKA)};file://{os.path.join(CURRENT_DIR, 'lib', AWS_MSK_IAM_AUTH)};file://{os.path.join(CURRENT_DIR, 'lib', KAFKA_CLIENTS)}",
)

### table definition
def create_source_table(
    table_name: str, topic_name: str, bootstrap_servers: str, startup_mode: str
):
    return f"""
    CREATE TABLE {table_name} (
        event_time TIMESTAMP(3),
        ticker VARCHAR(6),
        price DOUBLE
    )
    WITH (
        'connector' = 'kafka',
        'topic' = '{topic_name}',
        'properties.bootstrap.servers' = '{bootstrap_servers}',
        'properties.group.id' = 'source-group',
        'format' = 'json',
        'scan.startup.mode' = '{startup_mode}',
        'properties.security.protocol' = 'SASL_SSL',
        'properties.sasl.mechanism' = 'AWS_MSK_IAM',
        'properties.sasl.jaas.config' = 'software.amazon.msk.auth.iam.IAMLoginModule required;',
        'properties.sasl.client.callback.handler.class' = 'software.amazon.msk.auth.iam.IAMClientCallbackHandler'
    )
    """

pom.xml

<project>
    <modelVersion>4.0.0</modelVersion>
    <groupId>com.amazonaws</groupId>
    <artifactId>pyflink-getting-started</artifactId>
    <version>1</version>

    <properties>
        <flink.version>1.15.2</flink.version>
    </properties>

    <dependencies>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-sql-connector-kafka</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>software.amazon.msk</groupId>
            <artifactId>aws-msk-iam-auth</artifactId>
            <version>1.1.7</version>
        </dependency>

        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>2.8.1</version>
        </dependency>

    </dependencies>

    <build>

        <plugins>

            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>3.4.1</version>
                <configuration>
                    <transformers>
                        <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
                    </transformers>
                </configuration>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>

        </plugins>

    </build>

</project>
jaehyeon-kim commented 11 months ago

Found I have to use flink-connector-kafka instead of flink-sql-connector-kafka. See https://github.com/aws-samples/amazon-kinesis-data-analytics-blueprints/tree/main/apps/python-table-api/msk-serverless-to-s3-tableapi-python

petertriho commented 2 months ago

I was able to get this to work with flink-sql-connector-kafka by shading org.apache.kafka

<relocation>
  <pattern>org.apache.kafka</pattern>
  <shadedPattern>org.apache.flink.kafka.shaded.org.apache.kafka</shadedPattern>
</relocation>