Closed lauroawsps closed 1 year ago
Hello @lauroawsps , just to confirm that we (Kinesis Data Analytics) also ran into this issue and are currently working towards a fix. I will keep you posted. Sorry for the inconvenience.
Hello @lauroawsps, we have now addressed this issue for KDA Studio. We have added support for the non-relocated Kafka connector and MSK IAM as service managed dependencies. This means you will no longer need to rebuild and provide the connector. This change is not yet available on the console, if you would like to test it out now you can update your application using the CLI. Here is an example request (please note that you will need to provide the entire list of custom artifacts here. Any additional Jars in S3 you want to include should be added to the list (reference)):
aws kinesisanalyticsv2 update-application \
--application-name <application-name> \
--current-application-version-id <current-application-version-id> \
--application-configuration-update '{
"ZeppelinApplicationConfigurationUpdate": {
"CustomArtifactsConfigurationUpdate": [
{
"ArtifactType": "DEPENDENCY_JAR",
"MavenReference": {
"GroupId": "org.apache.flink",
"ArtifactId": "flink-connector-kafka_2.12",
"Version": "1.11.1"
}
},
{
"ArtifactType": "DEPENDENCY_JAR",
"MavenReference": {
"GroupId": "software.amazon.msk",
"ArtifactId": "aws-msk-iam-auth",
"Version": "1.1.0"
}
}
]
}
}'
Brilliant @dannycranmer! Thank you very much!
Closing since original issue seems to have been resolved.
@dannycranmer does the fix also apply for KDA applications and not KDA studio? Encountering the same error in one of our KDA applications using Flink trying to consume from MSK using IAM access:
2021-12-03 14:43:09
java.lang.RuntimeException: One or more fetchers have encountered exception
at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherManager.checkErrors(SplitFetcherManager.java:199)
at org.apache.flink.connector.base.source.reader.SourceReaderBase.getNextFetch(SourceReaderBase.java:154)
at org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:116)
at org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:294)
at org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:69)
at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:66)
at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:423)
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:204)
at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:681)
at org.apache.flink.streaming.runtime.tasks.StreamTask.executeInvoke(StreamTask.java:636)
at org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:647)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:620)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:784)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:571)
at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.lang.RuntimeException: SplitFetcher thread 0 received unexpected exception while polling the records
at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:146)
at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:101)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
... 1 more
Caused by: org.apache.kafka.common.errors.SaslAuthenticationException: An error: (java.security.PrivilegedActionException: javax.security.sasl.SaslException: Exception while evaluating challenge [Caused by javax.security.auth.callback.UnsupportedCallbackException]) occurred when evaluating SASL token received from the Kafka Broker. Kafka Client will go to AUTHENTICATION_FAILED state.
Caused by: javax.security.sasl.SaslException: Exception while evaluating challenge [Caused by javax.security.auth.callback.UnsupportedCallbackException]
at software.amazon.msk.auth.iam.internals.IAMSaslClient.evaluateChallenge(IAMSaslClient.java:113)
at org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.lambda$createSaslToken$1(SaslClientAuthenticator.java:474)
at java.base/java.security.AccessController.doPrivileged(Native Method)
at java.base/javax.security.auth.Subject.doAs(Subject.java:423)
at org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.createSaslToken(SaslClientAuthenticator.java:474)
at org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.sendSaslClientToken(SaslClientAuthenticator.java:381)
at org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.sendInitialToken(SaslClientAuthenticator.java:293)
at org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.authenticate(SaslClientAuthenticator.java:233)
at org.apache.kafka.common.network.KafkaChannel.prepare(KafkaChannel.java:173)
at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:547)
at org.apache.kafka.common.network.Selector.poll(Selector.java:483)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:547)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:262)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:233)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:212)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:230)
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.fetchCommittedOffsets(ConsumerCoordinator.java:788)
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.refreshCommittedOffsetsIfNeeded(ConsumerCoordinator.java:750)
at org.apache.kafka.clients.consumer.KafkaConsumer.updateFetchPositions(KafkaConsumer.java:2338)
at org.apache.kafka.clients.consumer.KafkaConsumer.position(KafkaConsumer.java:1725)
at org.apache.kafka.clients.consumer.KafkaConsumer.position(KafkaConsumer.java:1684)
at org.apache.flink.connector.kafka.source.reader.KafkaPartitionSplitReader.removeEmptySplits(KafkaPartitionSplitReader.java:348)
at org.apache.flink.connector.kafka.source.reader.KafkaPartitionSplitReader.handleSplitsChanges(KafkaPartitionSplitReader.java:235)
at org.apache.flink.connector.base.source.reader.fetcher.AddSplitsTask.run(AddSplitsTask.java:49)
at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:138)
at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:101)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: javax.security.auth.callback.UnsupportedCallbackException
at software.amazon.msk.auth.iam.IAMClientCallbackHandler.handle(IAMClientCallbackHandler.java:68)
at software.amazon.msk.auth.iam.internals.IAMSaslClient.generateClientMessage(IAMSaslClient.java:138)
at software.amazon.msk.auth.iam.internals.IAMSaslClient.evaluateChallenge(IAMSaslClient.java:95)
... 30 more
Curiously in just one of our accounts with the same MSK + IAM + KDA setup as other accounts.
@irisgve in our testing this issue did not impact KDA Flink applications. The issue surfaces once a second job is created; KDA Flink applications only ever create one job. Unless there is an edge case we did not consider? Can you please provide more details? Does the job consume records initially? Does the job failover/restart before hitting this issue?
Yeah, the job consumes records initially. The error as we saw actually causes the job to failover/restart so we had to fork the Flink Kafka source to retry when hitting MSK IAM errors because it just kills the job when encountered.
@dannycranmer I am seeing this error too in KDA using Flink application (not studio). This issue we have is this is causing KDA to restart which increases the source latency.
We are working on a fix. In the meantime if you want to move forwards you can patch the library yourself. This issue occurs when the IAMSaslClientProvider
is created by classloader A, and is trying to be used by classloader B. Since Amazon Kinesis Data Analytics for Flink runs a single job at a time, we can fix this via the following (link to code):
public static void initialize() {
- Security.addProvider(new IAMSaslClientProvider());
+ Security.insertProviderAt(new IAMSaslClientProvider(), 1);
}
This ensures that the most recent IAMSaslClientProvider
is used. Since there is a single job running at a time, the latest should always be used.
@sayantacC would you mind re-opening this ticket?
@dannycranmer Is there anyway for me to catch this error from my application and retry instead of patching the lib? I am having a hard time validating since our servers with IAM auth enabled are in higher environments which have a long release process.
I am wondering if something like this would work:
while (retryAttemptCount < RETRY_ATTEMPT_LIMIT) {
try {
LOGGER.info("Trying to create Flink Kafka consumer.");
return new FlinkKafkaConsumer<T>(
topics,
deserializationSchema,
properties);
} catch (Exception e) {
LOGGER.error(
"Failed to create consumer. Attempt: " + retryAttemptCount + " out of " + RETRY_ATTEMPT_LIMIT);
LOGGER.error("Failure message: " + e.getMessage());
retryAttemptCount++;
}
}
But since this class is actually invoked in the callback, I suspect if this will catch the error when thrown.
@dannycranmer Reopened as requested.
@vmohanan1 you could try re-creating the provider under this failure mode, example below. Disclaimer, I have not tested this. One concern is that if UnsupportedCallbackException
is not specific enough, or you have a high operator parallelism you might end up inserting a lot of providers un-necessarily.
...
} catch (Exception e) {
if (ExceptionUtils.hasCause(e, UnsupportedCallbackException.class)) {
Security.insertProviderAt(new IAMSaslClientProvider(), 1);
}
...
}
Note: ExceptionUtils from Apache commons
thanks I will try this. I am currently on version 1.1.1
. I am wondering if upgrading to 1.1.3
will give me retry without failing the pipeline capabilities. So I will try doing both.
@dannycranmer Just noticed that new IAMSaslClientProvider()
has a protected constructor. Any ETA on when the fix might be available?
Apologies I cannot provide an ETA at this point.
Some supplementary information:
The block diagram below illustrates the problem:
We have confirmed the issue also exists for other Kafka plugins, for example, Scram. However when using Scram the majority of callbacks are usable because they are also loaded from parent classloader (since the package prefix is javax
):
javax.security.auth.callback.NameCallback
javax.security.auth.callback.PasswordCallback
If you try to define any extensions via org.apache.kafka.common.security.scram.ScramExtensionsCallback
then it is rejected for the same reason as MSK IAM. The Kafka client simply swallows the exception with a debug message:
When you say Job A and Job B, you mean individual parallel units in a single job, correct?
When you say Job A and Job B, you mean individual parallel units in a single job, correct?
No I am referring to two parallel Flink jobs here. However, the same problem applies to a single job stop/start (failover).
@dannycranmer we have forked the library and applied the patch that uses insertProvider(..., 1)
instead of addProvider(...)
, however we have encountered the exception again when a single KDA flink job has stopped and restarted after running for a few days.
java.lang.RuntimeException: One or more fetchers have encountered exception
at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherManager.checkErrors(SplitFetcherManager.java:199)
at org.apache.flink.connector.base.source.reader.SourceReaderBase.getNextFetch(SourceReaderBase.java:154)
at org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:116)
at org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:294)
at org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:69)
at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:66)
at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:423)
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:204)
at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:681)
at org.apache.flink.streaming.runtime.tasks.StreamTask.executeInvoke(StreamTask.java:636)
at org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:647)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:620)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:784)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:571)
at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.lang.RuntimeException: SplitFetcher thread 0 received unexpected exception while polling the records
at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:146)
at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:101)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
... 1 more
Caused by: org.apache.kafka.common.errors.SaslAuthenticationException: An error: (java.security.PrivilegedActionException: javax.security.sasl.SaslException: Exception while evaluating challenge [Caused by javax.security.auth.callback.UnsupportedCallbackException: Unsupported callback type: class: software.amazon.msk.auth.iam.internals.AWSCredentialsCallback classloader: org.apache.flink.util.ChildFirstClassLoader@1b3a8cdb from class: software.amazon.msk.auth.iam.IAMClientCallbackHandler classloader: org.apache.flink.util.ChildFirstClassLoader@13d765c3]) occurred when evaluating SASL token received from the Kafka Broker. Kafka Client will go to AUTHENTICATION_FAILED state.
Caused by: javax.security.sasl.SaslException: Exception while evaluating challenge [Caused by javax.security.auth.callback.UnsupportedCallbackException: Unsupported callback type: class: software.amazon.msk.auth.iam.internals.AWSCredentialsCallback classloader: org.apache.flink.util.ChildFirstClassLoader@1b3a8cdb from class: software.amazon.msk.auth.iam.IAMClientCallbackHandler classloader: org.apache.flink.util.ChildFirstClassLoader@13d765c3]
at software.amazon.msk.auth.iam.internals.IAMSaslClient.evaluateChallenge(IAMSaslClient.java:113)
at org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.lambda$createSaslToken$1(SaslClientAuthenticator.java:474)
at java.base/java.security.AccessController.doPrivileged(Native Method)
at java.base/javax.security.auth.Subject.doAs(Subject.java:423)
at org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.createSaslToken(SaslClientAuthenticator.java:474)
at org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.sendSaslClientToken(SaslClientAuthenticator.java:381)
at org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.sendInitialToken(SaslClientAuthenticator.java:293)
at org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.authenticate(SaslClientAuthenticator.java:233)
at org.apache.kafka.common.network.KafkaChannel.prepare(KafkaChannel.java:173)
at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:547)
at org.apache.kafka.common.network.Selector.poll(Selector.java:483)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:547)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:262)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:233)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:224)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.awaitMetadataUpdate(ConsumerNetworkClient.java:161)
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:484)
at org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1267)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1231)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1211)
at org.apache.flink.connector.kafka.source.reader.KafkaPartitionSplitReader.fetch(KafkaPartitionSplitReader.java:104)
at com.atlassian.obsvs.kda.common.kafka.KafkaPartitionSplitReaderWithRetry.fetch(KafkaPartitionSplitReaderWithRetry.java:36)
at org.apache.flink.connector.base.source.reader.fetcher.FetchTask.run(FetchTask.java:56)
at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:138)
at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:101)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: javax.security.auth.callback.UnsupportedCallbackException: Unsupported callback type: class: software.amazon.msk.auth.iam.internals.AWSCredentialsCallback classloader: org.apache.flink.util.ChildFirstClassLoader@1b3a8cdb from class: software.amazon.msk.auth.iam.IAMClientCallbackHandler classloader: org.apache.flink.util.ChildFirstClassLoader@13d765c3
at software.amazon.msk.auth.iam.IAMClientCallbackHandler.handle(IAMClientCallbackHandler.java:85)
at software.amazon.msk.auth.iam.internals.IAMSaslClient.generateClientMessage(IAMSaslClient.java:138)
at software.amazon.msk.auth.iam.internals.IAMSaslClient.evaluateChallenge(IAMSaslClient.java:95)
... 29 more
The classloader references are shown in the stack trace, do you have any insights on what's happening and whether this can be manually patched?
@dannycranmer I'm running into the same issue. I have a Flink job with a single task running in Kubernetes: 2 jobmanagers (for HA) and 3 task managers. If I kill the leader jobmanager and let it restart, I start to see the UnsupportedCallbackException.
I forked the library and wasn't able to get this suggestion working
public static void initialize() {
- Security.addProvider(new IAMSaslClientProvider());
+ Security.insertProviderAt(new IAMSaslClientProvider(), 1);
}
but I tweaked it slightly and this seems to fix the problem for me
IAMSaslClientProvider iamSaslClientProvider = new IAMSaslClientProvider();
Security.removeProvider(iamSaslClientProvider.getName());
Security.addProvider(iamSaslClientProvider);
Security.insertProviderAt
(which is also used by Security.addProvider
under the covers) stipulates that A provider cannot be added if it is already installed.
, so does the provider need to be removed first if it already exists? Security.removeProvider
is a no-op if it doesn't exist. Let me know if this makes sense to you.
I have the same issue with flink (run on EKS) with services role and MSK IAM. The job is simple
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
KafkaSource<String> source = KafkaSource.<String>builder()
.setBootstrapServers(bootstrapServers)
.setTopics(inputTopic)
.setGroupId("my-group")
.setProperty("security.protocol", "SASL_SSL")
.setProperty("sasl.mechanism", "AWS_MSK_IAM")
.setProperty("sasl.jaas.config", "software.amazon.msk.auth.iam.IAMLoginModule required;")
.setProperty("sasl.client.callback.handler.class", "software.amazon.msk.auth.iam.IAMClientCallbackHandler")
.setStartingOffsets(OffsetsInitializer.earliest())
.setValueOnlyDeserializer(new SimpleStringSchema())
.build();
DataStream<String> text = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");
and similar error
2022-08-17 15:52:11
org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy
at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:138)
at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getGlobalFailureHandlingResult(ExecutionFailureHandler.java:101)
at org.apache.flink.runtime.scheduler.DefaultScheduler.handleGlobalFailure(DefaultScheduler.java:322)
at org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder$LazyInitializedCoordinatorContext.lambda$failJob$0(OperatorCoordinatorHolder.java:574)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$handleRunAsync$4(AkkaRpcActor.java:443)
at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:443)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:213)
at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:78)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:163)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20)
at scala.PartialFunction.applyOrElse(PartialFunction.scala:123)
at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)
at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
at akka.actor.Actor.aroundReceive(Actor.scala:537)
at akka.actor.Actor.aroundReceive$(Actor.scala:535)
at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:220)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:580)
at akka.actor.ActorCell.invoke(ActorCell.scala:548)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270)
at akka.dispatch.Mailbox.run(Mailbox.scala:231)
at akka.dispatch.Mailbox.exec(Mailbox.scala:243)
at java.base/java.util.concurrent.ForkJoinTask.doExec(Unknown Source)
at java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(Unknown Source)
at java.base/java.util.concurrent.ForkJoinPool.scan(Unknown Source)
at java.base/java.util.concurrent.ForkJoinPool.runWorker(Unknown Source)
at java.base/java.util.concurrent.ForkJoinWorkerThread.run(Unknown Source)
Caused by: org.apache.flink.util.FlinkException: Global failure triggered by OperatorCoordinator for 'Source: Kafka Source -> Sink: Print to Std. Out' (operator cbc357ccb763df2852fee8c4fc7d55f2).
at org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder$LazyInitializedCoordinatorContext.failJob(OperatorCoordinatorHolder.java:556)
at org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator$QuiesceableContext.failJob(RecreateOnResetOperatorCoordinator.java:231)
at org.apache.flink.runtime.source.coordinator.SourceCoordinatorContext.failJob(SourceCoordinatorContext.java:316)
at org.apache.flink.runtime.source.coordinator.SourceCoordinatorContext.handleUncaughtExceptionFromAsyncCall(SourceCoordinatorContext.java:329)
at org.apache.flink.util.ThrowableCatchingRunnable.run(ThrowableCatchingRunnable.java:42)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source)
at java.base/java.util.concurrent.FutureTask.run(Unknown Source)
at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(Unknown Source)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
at java.base/java.lang.Thread.run(Unknown Source)
Caused by: org.apache.flink.util.FlinkRuntimeException: Failed to list subscribed topic partitions due to
at org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumerator.checkPartitionChanges(KafkaSourceEnumerator.java:234)
at org.apache.flink.runtime.source.coordinator.ExecutorNotifier.lambda$null$1(ExecutorNotifier.java:83)
at org.apache.flink.util.ThrowableCatchingRunnable.run(ThrowableCatchingRunnable.java:40)
... 6 more
Caused by: java.lang.RuntimeException: Failed to get metadata for topics [my-topic].
at org.apache.flink.connector.kafka.source.enumerator.subscriber.KafkaSubscriberUtils.getTopicMetadata(KafkaSubscriberUtils.java:47)
at org.apache.flink.connector.kafka.source.enumerator.subscriber.TopicListSubscriber.getSubscribedTopicPartitions(TopicListSubscriber.java:52)
at org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumerator.getSubscribedTopicPartitions(KafkaSourceEnumerator.java:219)
at org.apache.flink.runtime.source.coordinator.ExecutorNotifier.lambda$notifyReadyAsync$2(ExecutorNotifier.java:80)
... 6 more
Caused by: java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.SaslAuthenticationException: An error: (java.security.PrivilegedActionException: javax.security.sasl.SaslException: Exception while evaluating challenge [Caused by javax.security.auth.callback.UnsupportedCallbackException: Unsupported callback type: class: software.amazon.msk.auth.iam.internals.AWSCredentialsCallback classloader: org.apache.flink.util.ChildFirstClassLoader@50b3ecb9 from class: software.amazon.msk.auth.iam.IAMClientCallbackHandler classloader: org.apache.flink.util.ChildFirstClassLoader@5ca5bed1]) occurred when evaluating SASL token received from the Kafka Broker. Kafka Client will go to AUTHENTICATION_FAILED state.
at org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45)
at org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32)
at org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89)
at org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:260)
at org.apache.flink.connector.kafka.source.enumerator.subscriber.KafkaSubscriberUtils.getTopicMetadata(KafkaSubscriberUtils.java:44)
... 9 more
Caused by: org.apache.kafka.common.errors.SaslAuthenticationException: An error: (java.security.PrivilegedActionException: javax.security.sasl.SaslException: Exception while evaluating challenge [Caused by javax.security.auth.callback.UnsupportedCallbackException: Unsupported callback type: class: software.amazon.msk.auth.iam.internals.AWSCredentialsCallback classloader: org.apache.flink.util.ChildFirstClassLoader@50b3ecb9 from class: software.amazon.msk.auth.iam.IAMClientCallbackHandler classloader: org.apache.flink.util.ChildFirstClassLoader@5ca5bed1]) occurred when evaluating SASL token received from the Kafka Broker. Kafka Client will go to AUTHENTICATION_FAILED state.
Caused by: javax.security.sasl.SaslException: Exception while evaluating challenge [Caused by javax.security.auth.callback.UnsupportedCallbackException: Unsupported callback type: class: software.amazon.msk.auth.iam.internals.AWSCredentialsCallback classloader: org.apache.flink.util.ChildFirstClassLoader@50b3ecb9 from class: software.amazon.msk.auth.iam.IAMClientCallbackHandler classloader: org.apache.flink.util.ChildFirstClassLoader@5ca5bed1]
at software.amazon.msk.auth.iam.internals.IAMSaslClient.evaluateChallenge(IAMSaslClient.java:113)
at org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.lambda$createSaslToken$1(SaslClientAuthenticator.java:534)
at java.base/java.security.AccessController.doPrivileged(Native Method)
at java.base/javax.security.auth.Subject.doAs(Unknown Source)
at org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.createSaslToken(SaslClientAuthenticator.java:534)
at org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.sendSaslClientToken(SaslClientAuthenticator.java:433)
at org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.sendInitialToken(SaslClientAuthenticator.java:332)
at org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.authenticate(SaslClientAuthenticator.java:273)
at org.apache.kafka.common.network.KafkaChannel.prepare(KafkaChannel.java:181)
at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:543)
at org.apache.kafka.common.network.Selector.poll(Selector.java:481)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:561)
at org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.processRequests(KafkaAdminClient.java:1333)
at org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.run(KafkaAdminClient.java:1264)
at java.base/java.lang.Thread.run(Unknown Source)
Caused by: javax.security.auth.callback.UnsupportedCallbackException: Unsupported callback type: class: software.amazon.msk.auth.iam.internals.AWSCredentialsCallback classloader: org.apache.flink.util.ChildFirstClassLoader@50b3ecb9 from class: software.amazon.msk.auth.iam.IAMClientCallbackHandler classloader: org.apache.flink.util.ChildFirstClassLoader@5ca5bed1
at software.amazon.msk.auth.iam.IAMClientCallbackHandler.handle(IAMClientCallbackHandler.java:85)
at software.amazon.msk.auth.iam.internals.IAMSaslClient.generateClientMessage(IAMSaslClient.java:138)
at software.amazon.msk.auth.iam.internals.IAMSaslClient.evaluateChallenge(IAMSaslClient.java:95)
... 14 more
Versions org.apache.kafka:kafka-clients:2.8.1 software.amazon.msk:aws-msk-iam-auth:1.1.4 flink-connector-kafka:1.15.1 flink:1.15.1
@james4388 Did you try the workaround that is mentioned here? Of course, if you have control over the Flink cluster for replacing the libs in the location /opt/flink/lib/
. I tried the same version of Flink-1.15.1 with aws-msk-iam-auth:1.1.4 by moving the libs to /opt/flink/lib/
and it works as expected with the EKS through K8's operator application mode (at least for my case). This is a workaround until this issue is fixed.
Just for clarity, I am sharing the list of jars (the following image), that I added to the lib location.
For the record I got the same issue, trying to produce a record in MSK and GlueSchemaRegistryKafkaSerializer with Scala:
"software.amazon.glue" % "schema-registry-serde" % "1.1.14",
"software.amazon.msk" % "aws-msk-iam-auth" % "1.1.42",
"org.apache.kafka" % "kafka-clients" % "3.3.1"
Code:
val props = new Properties()
props.put(
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
"...",
)
props.put("security.protocol", "SASL_SSL")
props.put("sasl.mechanism", "AWS_MSK_IAM")
props.put("sasl.jaas.config", """software.amazon.msk.auth.iam.IAMLoginModule required;""")
props.put("sasl.client.callback.handler.class", "software.amazon.msk.auth.iam.IAMClientCallbackHandler")
props.put(ProducerConfig.ACKS_CONFIG, "all")
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, classOf[StringSerializer].getName)
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, classOf[GlueSchemaRegistryKafkaSerializer].getName())
props.put(AWSSchemaRegistryConstants.DATA_FORMAT, DataFormat.JSON.name());
props.put(AWSSchemaRegistryConstants.AWS_REGION, "eu-west-1");
props.put(AWSSchemaRegistryConstants.REGISTRY_NAME, "default-registry");
props.put(AWSSchemaRegistryConstants.SCHEMA_NAME, "GsrBlogSchema");
val jsonSchema = ...
val jsonPayload = ...
val jsonSchemaWithData = JsonDataWithSchema.builder(jsonSchema, jsonPayload).build()
val producer = new KafkaProducer[String, JsonDataWithSchema](props)
val record = new ProducerRecord[String, JsonDataWithSchema]("glue-json", "toulouse", jsonSchemaWithData)
producer.send(record).get()
producer.flush()
producer.close()
[error] java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.SaslAuthenticationException: An error: (java.security.PrivilegedActionException: javax.security.sasl.SaslException: Exception while evaluating challenge [Caused by javax.security.auth.callback.UnsupportedCallbackException: Unsupported callback type: class: software.amazon.msk.auth.iam.internals.AWSCredentialsCallback classloader: sbt.internal.LayeredClassLoader@321db28e from class: software.amazon.msk.auth.iam.IAMClientCallbackHandler classloader: sbt.internal.LayeredClassLoader@2ad32589]) occurred when evaluating SASL token received from the Kafka Broker. Kafka Client will go to AUTHENTICATION_FAILED state.
[error] at org.apache.kafka.clients.producer.KafkaProducer$FutureFailure.<init>(KafkaProducer.java:1429)
[error] at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:1071)
[error] at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:949)
[error] at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:834)
[error] at Main$.delayedEndpoint$Main$1(Main.scala:85)
[error] at Main$delayedInit$body.apply(Main.scala:12)
[error] at scala.Function0.apply$mcV$sp(Function0.scala:42)
[error] at scala.Function0.apply$mcV$sp$(Function0.scala:42)
[error] at scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:17)
[error] at scala.App.$anonfun$main$1(App.scala:98)
[error] at scala.App.$anonfun$main$1$adapted(App.scala:98)
[error] at scala.collection.IterableOnceOps.foreach(IterableOnce.scala:575)
[error] at scala.collection.IterableOnceOps.foreach$(IterableOnce.scala:573)
[error] at scala.collection.AbstractIterable.foreach(Iterable.scala:933)
[error] at scala.App.main(App.scala:98)
[error] at scala.App.main$(App.scala:96)
[error] at Main$.main(Main.scala:12)
[error] at Main.main(Main.scala)
[error] at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
[error] at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
[error] at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
[error] at java.base/java.lang.reflect.Method.invoke(Method.java:568)
[error] Caused by: org.apache.kafka.common.errors.SaslAuthenticationException: An error: (java.security.PrivilegedActionException: javax.security.sasl.SaslException: Exception while evaluating challenge [Caused by javax.security.auth.callback.UnsupportedCallbackException: Unsupported callback type: class: software.amazon.msk.auth.iam.internals.AWSCredentialsCallback classloader: sbt.internal.LayeredClassLoader@321db28e from class: software.amazon.msk.auth.iam.IAMClientCallbackHandler classloader: sbt.internal.LayeredClassLoader@2ad32589]) occurred when evaluating SASL token received from the Kafka Broker. Kafka Client will go to AUTHENTICATION_FAILED state.
[error] Caused by: javax.security.sasl.SaslException: Exception while evaluating challenge [Caused by javax.security.auth.callback.UnsupportedCallbackException: Unsupported callback type: class: software.amazon.msk.auth.iam.internals.AWSCredentialsCallback classloader: sbt.internal.LayeredClassLoader@321db28e from class: software.amazon.msk.auth.iam.IAMClientCallbackHandler classloader: sbt.internal.LayeredClassLoader@2ad32589]
[error] at software.amazon.msk.auth.iam.internals.IAMSaslClient.evaluateChallenge(IAMSaslClient.java:113)
[error] at org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.lambda$createSaslToken$1(SaslClientAuthenticator.java:534)
[error] at java.base/java.security.AccessController.doPrivileged(AccessController.java:712)
[error] at java.base/javax.security.auth.Subject.doAs(Subject.java:439)
[error] at org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.createSaslToken(SaslClientAuthenticator.java:534)
[error] at org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.sendSaslClientToken(SaslClientAuthenticator.java:433)
[error] at org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.sendInitialToken(SaslClientAuthenticator.java:332)
[error] at org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.authenticate(SaslClientAuthenticator.java:273)
[error] at org.apache.kafka.common.network.KafkaChannel.prepare(KafkaChannel.java:181)
[error] at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:543)
[error] at org.apache.kafka.common.network.Selector.poll(Selector.java:481)
[error] at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:560)
[error] at org.apache.kafka.clients.NetworkClientUtils.awaitReady(NetworkClientUtils.java:73)
[error] at org.apache.kafka.clients.producer.internals.Sender.awaitNodeReady(Sender.java:534)
[error] at org.apache.kafka.clients.producer.internals.Sender.maybeSendAndPollTransactionalRequest(Sender.java:455)
[error] at org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:316)
[error] at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:243)
[error] at java.base/java.lang.Thread.run(Thread.java:833)
[error] Caused by: javax.security.auth.callback.UnsupportedCallbackException: Unsupported callback type: class: software.amazon.msk.auth.iam.internals.AWSCredentialsCallback classloader: sbt.internal.LayeredClassLoader@321db28e from class: software.amazon.msk.auth.iam.IAMClientCallbackHandler classloader: sbt.internal.LayeredClassLoader@2ad32589
[error] at software.amazon.msk.auth.iam.IAMClientCallbackHandler.handle(IAMClientCallbackHandler.java:85)
[error] at software.amazon.msk.auth.iam.internals.IAMSaslClient.generateClientMessage(IAMSaslClient.java:138)
[error] at software.amazon.msk.auth.iam.internals.IAMSaslClient.evaluateChallenge(IAMSaslClient.java:95)
[error] at org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.lambda$createSaslToken$1(SaslClientAuthenticator.java:534)
[error] at java.base/java.security.AccessController.doPrivileged(AccessController.java:712)
[error] at java.base/javax.security.auth.Subject.doAs(Subject.java:439)
[error] at org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.createSaslToken(SaslClientAuthenticator.java:534)
[error] at org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.sendSaslClientToken(SaslClientAuthenticator.java:433)
[error] at org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.sendInitialToken(SaslClientAuthenticator.java:332)
[error] at org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.authenticate(SaslClientAuthenticator.java:273)
[error] at org.apache.kafka.common.network.KafkaChannel.prepare(KafkaChannel.java:181)
[error] at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:543)
[error] at org.apache.kafka.common.network.Selector.poll(Selector.java:481)
[error] at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:560)
[error] at org.apache.kafka.clients.NetworkClientUtils.awaitReady(NetworkClientUtils.java:73)
[error] at org.apache.kafka.clients.producer.internals.Sender.awaitNodeReady(Sender.java:534)
[error] at org.apache.kafka.clients.producer.internals.Sender.maybeSendAndPollTransactionalRequest(Sender.java:455)
[error] at org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:316)
[error] at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:243)
[error] at java.base/java.lang.Thread.run(Thread.java:833)
Converting the exact same code in Java, make it work, I don't know why.
Also for sala using local publish version of aws-msk-iam-auth
solve the issue.
Do you know when you will do a new release?
Fixed in Release 1.1.5..
Hello,
Using the library aws-msk-iam-auth to authenticate Zeppelin and Flink processes in an MSK cluster with IAM authentication enabled, following error is being thrown when trying to run more than one task at the same TaskManager:
Library version 1.1.0 has been imported from S3 bucket to Studio Notebook and added as a dependency. Following configuration is being used:
As the default artifact included in the notebook (flink-sql-connector-kafka_2.12) shades Kafka client dependencies, following Gradle was used to also shade dependencies within aws-msk-iam-auth:
The setup works and authenticates fine with MSK when a single task is being executed by TaskManager. However, as mentioned before, when trying to run more than one task in the same manager, following exception is thrown:
Could you please help with the issue?
Steps to reproduce: