googleapis / google-auth-library-java

Open source Auth client library for Java
https://developers.google.com/identity
BSD 3-Clause "New" or "Revised" License
410 stars 229 forks source link

Flink app hosted in AWS trying to publish to GCP and throwing NPE at com.google.auth.oauth2.InternalAwsSecurityCredentialsSupplier.retrieveResource #1538

Open Ghilherme opened 1 month ago

Ghilherme commented 1 month ago

Environment details

We are using Flink 1.19 with Java 11 hosted on an EC2 in AWS environment and trying to publish messages to GCP pub/sub. We are using Workload Identity Federation to exchange tokens between AWS and GCP.

We are using PubSubSink connector from Flink Docs: https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/connectors/datastream/pubsub/#pubsub-sink

Our code only generate some mock data and let the connector publish without any complex logic, only for validation.

public class FlinkToPubSub {
    private static final Logger log = LoggerFactory.getLogger(FlinkToPubSub.class.getName());

    public static void main(String[] args) throws Exception {
        System.setProperty("java.net.preferIPv4Stack" , "true");
        log.info("Initializing application...");
        S3Client s3 = S3Client.create();
        String bucketName = "bucketCompanyName";
        String key = "key-aws-gcp-pp.json";
        String localPath = "/opt/response_test_"   LocalDateTime.now().toEpochSecond(ZoneOffset.UTC)  ".json" ;

        s3.getObject(GetObjectRequest.builder().bucket(bucketName).key(key).build(), Paths.get(localPath));

        ExternalAccountCredentials credentials = ExternalAccountCredentials.fromStream(new FileInputStream(localPath));

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        env.enableCheckpointing(1000L);

        DataStream<String> testDataStream = env.addSource(new TestDataGenerator());

        testDataStream.addSink(createPubSink(credentials));

        env.execute("Flink to GCP Pub/Sub Test");
    }

    private static PubSubSink<String> createPubSink(ExternalAccountCredentials credentials) throws IOException {
        String projectName = "project-non-prod";
        String topicName = "topicName";

        return PubSubSink.newBuilder()
                .withSerializationSchema(new SimpleStringSchema())
                .withProjectName(projectName)
                .withTopicName(topicName)
                .withCredentials(credentials).build();
    }

    private static class TestDataGenerator implements SourceFunction<String> {
        private boolean running = true;

        @Override
        public void run(SourceContext<String> ctx) {

            while (running) {
                var time = LocalDateTime.now();
                ctx.collect(time  "somemockdatamessage");
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            }
        }

        @Override
        public void cancel() {
            running = false;
        }
    }
}

Stack trace

Caused by: java.lang.RuntimeException: Failed trying to publish message
    at org.apache.flink.streaming.connectors.gcp.pubsub.PubSubSink$FailureHandler.onFailure(PubSubSink.java:342)
    at com.google.api.core.ApiFutures$1.onFailure(ApiFutures.java:84)
    at com.google.common.util.concurrent.Futures$CallbackListener.run(Futures.java:1130)
    at org.apache.flink.util.concurrent.Executors$DirectExecutor.execute(Executors.java:60)
    at com.google.common.util.concurrent.AbstractFuture.executeListener(AbstractFuture.java:1298)
    at com.google.common.util.concurrent.AbstractFuture.complete(AbstractFuture.java:1059)
    at com.google.common.util.concurrent.AbstractFuture.setException(AbstractFuture.java:809)
    at com.google.api.core.AbstractApiFuture$InternalSettableFuture.setException(AbstractApiFuture.java:92)
    at com.google.api.core.AbstractApiFuture.setException(AbstractApiFuture.java:74)
    at com.google.api.core.SettableApiFuture.setException(SettableApiFuture.java:51)
    at com.google.cloud.pubsub.v1.Publisher$OutstandingBatch.onFailure(Publisher.java:571)
    at com.google.cloud.pubsub.v1.Publisher$OutstandingBatch.access$1900(Publisher.java:538)
    at com.google.cloud.pubsub.v1.Publisher$3.onFailure(Publisher.java:514)
    at com.google.api.core.ApiFutures$1.onFailure(ApiFutures.java:84)
    at com.google.common.util.concurrent.Futures$CallbackListener.run(Futures.java:1130)
    at com.google.common.util.concurrent.DirectExecutor.execute(DirectExecutor.java:31)
    at com.google.common.util.concurrent.AbstractFuture.executeListener(AbstractFuture.java:1298)
    at com.google.common.util.concurrent.AbstractFuture.complete(AbstractFuture.java:1059)
    at com.google.common.util.concurrent.AbstractFuture.setException(AbstractFuture.java:809)
    at com.google.api.gax.retrying.BasicRetryingFuture.handleAttempt(BasicRetryingFuture.java:200)
    at com.google.api.gax.retrying.CallbackChainRetryingFuture$AttemptCompletionListener.handle(CallbackChainRetryingFuture.java:135)
    at com.google.api.gax.retrying.CallbackChainRetryingFuture$AttemptCompletionListener.run(CallbackChainRetryingFuture.java:117)
    at com.google.common.util.concurrent.DirectExecutor.execute(DirectExecutor.java:31)
    at com.google.common.util.concurrent.AbstractFuture.executeListener(AbstractFuture.java:1298)
    at com.google.common.util.concurrent.AbstractFuture.complete(AbstractFuture.java:1059)
    at com.google.common.util.concurrent.AbstractFuture.setException(AbstractFuture.java:809)
    at com.google.api.core.AbstractApiFuture$InternalSettableFuture.setException(AbstractApiFuture.java:92)
    at com.google.api.core.AbstractApiFuture.setException(AbstractApiFuture.java:74)
    at com.google.api.gax.grpc.GrpcExceptionCallable$ExceptionTransformingFuture.onFailure(GrpcExceptionCallable.java:97)
    at com.google.api.core.ApiFutures$1.onFailure(ApiFutures.java:84)
    at com.google.common.util.concurrent.Futures$CallbackListener.run(Futures.java:1130)
    at com.google.common.util.concurrent.DirectExecutor.execute(DirectExecutor.java:31)
    at com.google.common.util.concurrent.AbstractFuture.executeListener(AbstractFuture.java:1298)
    at com.google.common.util.concurrent.AbstractFuture.complete(AbstractFuture.java:1059)
    at com.google.common.util.concurrent.AbstractFuture.setException(AbstractFuture.java:809)
    at io.grpc.stub.ClientCalls$GrpcFuture.setException(ClientCalls.java:568)
    at io.grpc.stub.ClientCalls$UnaryStreamToFuture.onClose(ClientCalls.java:538)
    at io.grpc.PartialForwardingClientCallListener.onClose(PartialForwardingClientCallListener.java:39)
    at io.grpc.ForwardingClientCallListener.onClose(ForwardingClientCallListener.java:23)
    at io.grpc.ForwardingClientCallListener$SimpleForwardingClientCallListener.onClose(ForwardingClientCallListener.java:40)
    at com.google.api.gax.grpc.ChannelPool$ReleasingClientCall$1.onClose(ChannelPool.java:570)
    at io.grpc.internal.DelayedClientCall$DelayedListener$3.run(DelayedClientCall.java:489)
    at io.grpc.internal.DelayedClientCall$DelayedListener.delayOrExecute(DelayedClientCall.java:453)
    at io.grpc.internal.DelayedClientCall$DelayedListener.onClose(DelayedClientCall.java:486)
    at io.grpc.internal.ClientCallImpl.closeObserver(ClientCallImpl.java:574)
    at io.grpc.internal.ClientCallImpl.access$300(ClientCallImpl.java:72)
    at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInternal(ClientCallImpl.java:742)
    at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInContext(ClientCallImpl.java:723)
    at io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
    at io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:133)
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
    at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    ... 1 more
Caused by: com.google.api.gax.rpc.UnauthenticatedException: io.grpc.StatusRuntimeException: UNAUTHENTICATED: Failed computing credential metadata
    at com.google.api.gax.rpc.ApiExceptionFactory.createException(ApiExceptionFactory.java:116)
    at com.google.api.gax.rpc.ApiExceptionFactory.createException(ApiExceptionFactory.java:41)
    at com.google.api.gax.grpc.GrpcApiExceptionFactory.create(GrpcApiExceptionFactory.java:86)
    at com.google.api.gax.grpc.GrpcApiExceptionFactory.create(GrpcApiExceptionFactory.java:66)
    ... 28 more
Caused by: io.grpc.StatusRuntimeException: UNAUTHENTICATED: Failed computing credential metadata
    at io.grpc.Status.asRuntimeException(Status.java:533)
    ... 20 more
Caused by: java.lang.NullPointerException
    at com.google.auth.oauth2.InternalAwsSecurityCredentialsSupplier.retrieveResource(InternalAwsSecurityCredentialsSupplier.java:204)
    at com.google.auth.oauth2.InternalAwsSecurityCredentialsSupplier.retrieveResource(InternalAwsSecurityCredentialsSupplier.java:193)
    at com.google.auth.oauth2.InternalAwsSecurityCredentialsSupplier.getRegion(InternalAwsSecurityCredentialsSupplier.java:151)
    at com.google.auth.oauth2.AwsCredentials.retrieveSubjectToken(AwsCredentials.java:138)
    at com.google.auth.oauth2.AwsCredentials.refreshAccessToken(AwsCredentials.java:121)
    at com.google.auth.oauth2.OAuth2Credentials$1.call(OAuth2Credentials.java:270)
    at com.google.auth.oauth2.OAuth2Credentials$1.call(OAuth2Credentials.java:267)
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
    at com.google.auth.oauth2.OAuth2Credentials$RefreshTask.run(OAuth2Credentials.java:635)
    ... 3 more

The error looks in this line of the code inside this repo: com.google.auth.oauth2.InternalAwsSecurityCredentialsSupplier.retrieveResource(InternalAwsSecurityCredentialsSupplier.java:204)

How can we solve this?

Thanks!

Ghilherme commented 1 month ago

pom.xml:

`<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">

4.0.0
<groupId>org.example</groupId>
<artifactId>gcp-pub-sub</artifactId>
<version>1.0-SNAPSHOT</version>

<properties>
    <maven.compiler.source>11</maven.compiler.source>
    <maven.compiler.target>11</maven.compiler.target>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencies>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-streaming-java</artifactId>
        <version>1.19.0</version>
    </dependency>

    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-connector-gcp-pubsub</artifactId>
        <version>3.1.0-1.19</version>
        <scope>compile</scope>
    </dependency>

    <dependency>
        <groupId>software.amazon.awssdk</groupId>
        <artifactId>s3</artifactId>
        <version>2.20.68</version>
    </dependency>

    <dependency>
        <groupId>org.slf4j</groupId>
        <artifactId>slf4j-api</artifactId>
        <version>1.7.30</version>
    </dependency>

    <dependency>
        <groupId>org.slf4j</groupId>
        <artifactId>slf4j-log4j12</artifactId>
        <version>1.7.30</version>
    </dependency>
    <dependency>
        <groupId>io.grpc</groupId>
        <artifactId>grpc-netty-shaded</artifactId>
        <version>1.62.2</version> <!-- Use the appropriate version -->
    </dependency>

    <dependency>
        <groupId>com.google.http-client</groupId>
        <artifactId>google-http-client</artifactId>
        <version>1.44.1</version>
    </dependency>

    <dependency>
        <groupId>com.google.http-client</groupId>
        <artifactId>google-http-client-apache-v2</artifactId>
        <version>1.44.1</version>
    </dependency>
</dependencies>

<build>
    <plugins>
        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-compiler-plugin</artifactId>
            <version>3.8.1</version>
            <configuration>
                <source>11</source>
                <target>11</target>
            </configuration>
        </plugin>

        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-shade-plugin</artifactId>
            <version>3.4.1</version>
            <executions>
                <execution>
                    <phase>package</phase>
                    <goals>
                        <goal>shade</goal>
                    </goals>
                    <configuration>
                        <filters>
                            <filter>
                                <artifact>*:*</artifact>
                                <excludes>
                                    <exclude>META-INF/*.SF</exclude>
                                    <exclude>META-INF/*.DSA</exclude>
                                    <exclude>META-INF/*.RSA</exclude>
                                </excludes>
                            </filter>
                        </filters>
                        <transformers>
                            <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                <mainClass>org.example.FlinkToPubSub</mainClass>
                            </transformer>
                            <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
                        </transformers>
                    </configuration>
                </execution>
            </executions>
        </plugin>
    </plugins>
</build>

`

stankiewicz commented 1 month ago

I'm curious if it's not related to https://github.com/googleapis/google-auth-library-java/issues/1408

lqiu96 commented 3 days ago

@lsirac This is probably something that you are more familiar with. Potentially dealing with Workload Identity Federation.

stankiewicz commented 2 days ago

hi, we were able to diagnose the issue. Root cause for it is that some libraries like flink pubsub try to serialize credentials which is not great idea but may work for GoogleCredentials. In this case AwsCredentials extends ExternalAccountCredentials which have transient transportFactory. After deserialization such credentials are unusable as they throw NPE when accessing transportFactory.

I haven't found a way to fix/patch transportFactory on existing ExternalAccountCredentials object.

solution for this is to use different Flink connector that is correctly setting up credentials from local ADC via provider function.