roncemer / spark-sql-kinesis

Kinesis Connector for Spark Structured Streaming
http://www.roncemer.com
Apache License 2.0
11 stars 6 forks source link

Seeing InvalidSignatureException when using a non-standard endpointURL #5

Closed J-animA closed 8 months ago

J-animA commented 11 months ago

Hello,

Is there a way to specify the "region" to be used in the credential signing request (specifically in the Authorization Header)? My use case would benefit from using a custom Kinesis endpoint as it will allow direct connect between the VPC the Spark Kinesis job will be running in and the VPC that the kinesis stream will be consumed from.

Using the "standard" AWS kinesis endpoint for us-east-1 works without issue, but using it means spark must first go out to the internet and then authenticate and consume the stream rather than just using direct-connect.

The direct-connect/custom endopointURL I am using is https://kinesis-ae1.hdw.r53.deap.tv and from debug logs I can see that the credential portion of the Authorization header being sent is using the ae1 portion of the URL.

Full log:

23/09/05 10:05:02 DEBUG AWS4Signer: AWS4 String to Sign: '"AWS4-HMAC-SHA256
20230905T140502Z
20230905/ae1/kinesis/aws4_request
8d03e7b4f4ae6a31bdad88ffe815e7f27f5cf1b331bd980176e5a51f47c109f4"
23/09/05 10:05:02 DEBUG AWS4Signer: Generating a new signing key as the signing key not available in the cache for the date 1693872000000
23/09/05 10:05:02 DEBUG RequestAddCookies: CookieSpec selected: default
23/09/05 10:05:02 DEBUG RequestAuthCache: Auth cache not set in the context
23/09/05 10:05:02 DEBUG PoolingHttpClientConnectionManager: Connection request: [route: {s}->https://kinesis-ae1.hdw.r53.deap.tv:443][total available: 0; route allocated: 0 of 50; total allocated: 0 of 50]
23/09/05 10:05:02 DEBUG PoolingHttpClientConnectionManager: Connection leased: [id: 0][route: {s}->https://kinesis-ae1.hdw.r53.deap.tv:443][total available: 0; route allocated: 1 of 50; total allocated: 1 of 50]
23/09/05 10:05:02 DEBUG MainClientExec: Opening connection {s}->https://kinesis-ae1.hdw.r53.deap.tv:443
23/09/05 10:05:02 DEBUG DefaultHttpClientConnectionOperator: Connecting to kinesis-ae1.hdw.r53.deap.tv/96.115.201.171:443
23/09/05 10:05:02 DEBUG SdkTLSSocketFactory: connecting to kinesis-ae1.hdw.r53.deap.tv/96.115.201.171:443
23/09/05 10:05:02 DEBUG SdkTLSSocketFactory: Connecting socket to kinesis-ae1.hdw.r53.deap.tv/96.115.201.171:443 with timeout 10000
23/09/05 10:05:02 DEBUG SdkTLSSocketFactory: Enabled protocols: [TLSv1.3, TLSv1.2]
23/09/05 10:05:02 DEBUG SdkTLSSocketFactory: Enabled cipher suites:[TLS_AES_256_GCM_SHA384, TLS_AES_128_GCM_SHA256, TLS_CHACHA20_POLY1305_SHA256, TLS_ECDHE_ECDSA_WITH_AES_256_GCM_SHA384, TLS_ECDHE_ECDSA_WITH_AES_128_GCM_SHA256, TLS_ECDHE_ECDSA_WITH_CHACHA20_POLY1305_SHA256, TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384, TLS_ECDHE_RSA_WITH_CHACHA20_POLY1305_SHA256, TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256, TLS_DHE_RSA_WITH_AES_256_GCM_SHA384, TLS_DHE_RSA_WITH_CHACHA20_POLY1305_SHA256, TLS_DHE_DSS_WITH_AES_256_GCM_SHA384, TLS_DHE_RSA_WITH_AES_128_GCM_SHA256, TLS_DHE_DSS_WITH_AES_128_GCM_SHA256, TLS_ECDHE_ECDSA_WITH_AES_256_CBC_SHA384, TLS_ECDHE_RSA_WITH_AES_256_CBC_SHA384, TLS_ECDHE_ECDSA_WITH_AES_128_CBC_SHA256, TLS_ECDHE_RSA_WITH_AES_128_CBC_SHA256, TLS_DHE_RSA_WITH_AES_256_CBC_SHA256, TLS_DHE_DSS_WITH_AES_256_CBC_SHA256, TLS_DHE_RSA_WITH_AES_128_CBC_SHA256, TLS_DHE_DSS_WITH_AES_128_CBC_SHA256, TLS_ECDH_ECDSA_WITH_AES_256_GCM_SHA384, TLS_ECDH_RSA_WITH_AES_256_GCM_SHA384, TLS_ECDH_ECDSA_WITH_AES_128_GCM_SHA256, TLS_ECDH_RSA_WITH_AES_128_GCM_SHA256, TLS_ECDH_ECDSA_WITH_AES_256_CBC_SHA384, TLS_ECDH_RSA_WITH_AES_256_CBC_SHA384, TLS_ECDH_ECDSA_WITH_AES_128_CBC_SHA256, TLS_ECDH_RSA_WITH_AES_128_CBC_SHA256, TLS_ECDHE_ECDSA_WITH_AES_256_CBC_SHA, TLS_ECDHE_RSA_WITH_AES_256_CBC_SHA, TLS_ECDHE_ECDSA_WITH_AES_128_CBC_SHA, TLS_ECDHE_RSA_WITH_AES_128_CBC_SHA, TLS_DHE_RSA_WITH_AES_256_CBC_SHA, TLS_DHE_DSS_WITH_AES_256_CBC_SHA, TLS_DHE_RSA_WITH_AES_128_CBC_SHA, TLS_DHE_DSS_WITH_AES_128_CBC_SHA, TLS_ECDH_ECDSA_WITH_AES_256_CBC_SHA, TLS_ECDH_RSA_WITH_AES_256_CBC_SHA, TLS_ECDH_ECDSA_WITH_AES_128_CBC_SHA, TLS_ECDH_RSA_WITH_AES_128_CBC_SHA, TLS_RSA_WITH_AES_256_GCM_SHA384, TLS_RSA_WITH_AES_128_GCM_SHA256, TLS_RSA_WITH_AES_256_CBC_SHA256, TLS_RSA_WITH_AES_128_CBC_SHA256, TLS_RSA_WITH_AES_256_CBC_SHA, TLS_RSA_WITH_AES_128_CBC_SHA, TLS_EMPTY_RENEGOTIATION_INFO_SCSV]
23/09/05 10:05:02 DEBUG SdkTLSSocketFactory: socket.getSupportedProtocols(): [TLSv1.3, TLSv1.2, TLSv1.1, TLSv1, SSLv3, SSLv2Hello], socket.getEnabledProtocols(): [TLSv1.3, TLSv1.2]
23/09/05 10:05:02 DEBUG SdkTLSSocketFactory: TLS protocol enabled for SSL handshake: [TLSv1.2, TLSv1.1, TLSv1, TLSv1.3]
23/09/05 10:05:02 DEBUG SdkTLSSocketFactory: Starting handshake
23/09/05 10:05:02 DEBUG SdkTLSSocketFactory: Secure session established
23/09/05 10:05:02 DEBUG SdkTLSSocketFactory:  negotiated protocol: TLSv1.2
23/09/05 10:05:02 DEBUG SdkTLSSocketFactory:  negotiated cipher suite: TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256
23/09/05 10:05:02 DEBUG SdkTLSSocketFactory:  peer principal: CN=kinesis-ae1.hdw.r53.deap.tv
23/09/05 10:05:02 DEBUG SdkTLSSocketFactory:  peer alternative names: [kinesis-ae1.hdw.r53.deap.tv]
23/09/05 10:05:02 DEBUG SdkTLSSocketFactory:  issuer principal: CN=Amazon RSA 2048 M01, O=Amazon, C=US
23/09/05 10:05:02 DEBUG SdkSSLSocket: created: kinesis-ae1.hdw.r53.deap.tv/96.115.201.171:443
23/09/05 10:05:02 DEBUG DefaultHttpClientConnectionOperator: Connection established 10.26.47.236:54753<->96.115.201.171:443
23/09/05 10:05:02 DEBUG DefaultManagedHttpClientConnection: http-outgoing-0: set socket timeout to 50000
23/09/05 10:05:02 DEBUG MainClientExec: Executing request POST / HTTP/1.1
23/09/05 10:05:02 DEBUG MainClientExec: Proxy auth state: UNCHALLENGED
23/09/05 10:05:02 DEBUG headers: http-outgoing-0 >> POST / HTTP/1.1
23/09/05 10:05:02 DEBUG headers: http-outgoing-0 >> Host: kinesis-ae1.hdw.r53.deap.tv
23/09/05 10:05:02 DEBUG headers: http-outgoing-0 >> amz-sdk-invocation-id: 49ded3f6-e95f-5699-bd4c-98ba8cfa8744
23/09/05 10:05:02 DEBUG headers: http-outgoing-0 >> amz-sdk-retry: 0/0/500
23/09/05 10:05:02 DEBUG headers: http-outgoing-0 >> Authorization: AWS4-HMAC-SHA256 Credential=AKIA4235AZ77MLL7HD62/20230905/ae1/kinesis/aws4_request, SignedHeaders=amz-sdk-invocation-id;amz-sdk-retry;content-length;content-type;host;user-agent;x-amz-date;x-amz-target, Signature=e8a659cb4dff91b3b453c22177ee6b99e0daa1f20f5411f0b97c379d3f8cf477
23/09/05 10:05:02 DEBUG headers: http-outgoing-0 >> Content-Type: application/x-amz-cbor-1.1
23/09/05 10:05:02 DEBUG headers: http-outgoing-0 >> User-Agent: aws-sdk-java/1.11.655 Mac_OS_X/13.3.1 OpenJDK_64-Bit_Server_VM/11.0.20+0 java/11.0.20 scala/2.12.17 vendor/Homebrew
23/09/05 10:05:02 DEBUG headers: http-outgoing-0 >> X-Amz-Date: 20230905T140502Z
23/09/05 10:05:02 DEBUG headers: http-outgoing-0 >> X-Amz-Target: Kinesis_20131202.ListShards
23/09/05 10:05:02 DEBUG headers: http-outgoing-0 >> Content-Length: 45
23/09/05 10:05:02 DEBUG headers: http-outgoing-0 >> Connection: Keep-Alive
23/09/05 10:05:02 DEBUG wire: http-outgoing-0 >> "POST / HTTP/1.1[\r][\n]"
23/09/05 10:05:02 DEBUG wire: http-outgoing-0 >> "Host: kinesis-ae1.hdw.r53.deap.tv[\r][\n]"
23/09/05 10:05:02 DEBUG wire: http-outgoing-0 >> "amz-sdk-invocation-id: 49ded3f6-e95f-5699-bd4c-98ba8cfa8744[\r][\n]"
23/09/05 10:05:02 DEBUG wire: http-outgoing-0 >> "amz-sdk-retry: 0/0/500[\r][\n]"
23/09/05 10:05:02 DEBUG wire: http-outgoing-0 >> "Authorization: AWS4-HMAC-SHA256 Credential=AKIA4235AZ77MLL7HD62/20230905/ae1/kinesis/aws4_request, SignedHeaders=amz-sdk-invocation-id;amz-sdk-retry;content-length;content-type;host;user-agent;x-amz-date;x-amz-target, Signature=e8a659cb4dff91b3b453c22177ee6b99e0daa1f20f5411f0b97c379d3f8cf477[\r][\n]"
23/09/05 10:05:02 DEBUG wire: http-outgoing-0 >> "Content-Type: application/x-amz-cbor-1.1[\r][\n]"
23/09/05 10:05:02 DEBUG wire: http-outgoing-0 >> "User-Agent: aws-sdk-java/1.11.655 Mac_OS_X/13.3.1 OpenJDK_64-Bit_Server_VM/11.0.20+0 java/11.0.20 scala/2.12.17 vendor/Homebrew[\r][\n]"
23/09/05 10:05:02 DEBUG wire: http-outgoing-0 >> "X-Amz-Date: 20230905T140502Z[\r][\n]"
23/09/05 10:05:02 DEBUG wire: http-outgoing-0 >> "X-Amz-Target: Kinesis_20131202.ListShards[\r][\n]"
23/09/05 10:05:02 DEBUG wire: http-outgoing-0 >> "Content-Length: 45[\r][\n]"
23/09/05 10:05:02 DEBUG wire: http-outgoing-0 >> "Connection: Keep-Alive[\r][\n]"
23/09/05 10:05:02 DEBUG wire: http-outgoing-0 >> "[\r][\n]"
23/09/05 10:05:02 DEBUG wire: http-outgoing-0 >> "[0xbf]jStreamNameqxdpxconf.logs.stgjMaxResults[0x19]'[0x10][0xff]"
23/09/05 10:05:02 DEBUG wire: http-outgoing-0 << "HTTP/1.1 400 Bad Request[\r][\n]"
23/09/05 10:05:02 DEBUG wire: http-outgoing-0 << "Date: Tue, 05 Sep 2023 14:05:02 GMT[\r][\n]"
23/09/05 10:05:02 DEBUG wire: http-outgoing-0 << "Content-Type: application/x-amz-cbor-1.1[\r][\n]"
23/09/05 10:05:02 DEBUG wire: http-outgoing-0 << "Content-Length: 93[\r][\n]"
23/09/05 10:05:02 DEBUG wire: http-outgoing-0 << "Connection: keep-alive[\r][\n]"
23/09/05 10:05:02 DEBUG wire: http-outgoing-0 << "x-amzn-RequestId: c0bf6457-3fc6-766b-9dce-53991bab6331[\r][\n]"
23/09/05 10:05:02 DEBUG wire: http-outgoing-0 << "x-amz-id-2: iFeaQAYMsrYDKOlFZXz5IkqQN7qr11e5TH4UH8KHIgDZKigQvxk7EtGUUm8jkuvEeCKfUju+M4QdKAiUR7UxNleaPH3/AgF3[\r][\n]"
23/09/05 10:05:02 DEBUG wire: http-outgoing-0 << "[\r][\n]"
23/09/05 10:05:02 DEBUG headers: http-outgoing-0 << HTTP/1.1 400 Bad Request
23/09/05 10:05:02 DEBUG headers: http-outgoing-0 << Date: Tue, 05 Sep 2023 14:05:02 GMT
23/09/05 10:05:02 DEBUG headers: http-outgoing-0 << Content-Type: application/x-amz-cbor-1.1
23/09/05 10:05:02 DEBUG headers: http-outgoing-0 << Content-Length: 93
23/09/05 10:05:02 DEBUG headers: http-outgoing-0 << Connection: keep-alive
23/09/05 10:05:02 DEBUG headers: http-outgoing-0 << x-amzn-RequestId: c0bf6457-3fc6-766b-9dce-53991bab6331
23/09/05 10:05:02 DEBUG headers: http-outgoing-0 << x-amz-id-2: iFeaQAYMsrYDKOlFZXz5IkqQN7qr11e5TH4UH8KHIgDZKigQvxk7EtGUUm8jkuvEeCKfUju+M4QdKAiUR7UxNleaPH3/AgF3
23/09/05 10:05:02 DEBUG MainClientExec: Connection can be kept alive for 60000 MILLISECONDS
23/09/05 10:05:02 DEBUG wire: http-outgoing-0 << "[0xbf]f__typex[0x19]InvalidSignatureExceptiongmessagex/Credential should be scoped to a valid region. [0xff]"
23/09/05 10:05:02 DEBUG PoolingHttpClientConnectionManager: Connection [id: 0][route: {s}->https://kinesis-ae1.hdw.r53.deap.tv:443] can be kept alive for 60.0 seconds
23/09/05 10:05:02 DEBUG DefaultManagedHttpClientConnection: http-outgoing-0: set socket timeout to 0
23/09/05 10:05:02 DEBUG PoolingHttpClientConnectionManager: Connection released: [id: 0][route: {s}->https://kinesis-ae1.hdw.r53.deap.tv:443][total available: 1; route allocated: 1 of 50; total allocated: 1 of 50]
23/09/05 10:05:02 DEBUG request: Received error response: org.apache.spark.sql.kinesis.shaded.amazonaws.services.kinesis.model.AmazonKinesisException: Credential should be scoped to a valid region.  (Service: AmazonKinesis; Status Code: 400; Error Code: InvalidSignatureException; Request ID: c0bf6457-3fc6-766b-9dce-53991bab6331)

Relevant line:

23/09/05 10:05:02 DEBUG headers: http-outgoing-0 >> Authorization: AWS4-HMAC-SHA256 Credential=AKIA4235AZ77MLL7HD62/20230905/ae1/kinesis/aws4_request, SignedHeaders=amz-sdk-invocation-id;amz-sdk-retry;content-length;content-type;host;user-agent;x-amz-date;x-amz-target, Signature=e8a659cb4dff91b3b453c22177ee6b99e0daa1f20f5411f0b97c379d3f8cf477

I understand that this might have something to do with the implementation of the Kinesis Client Library, but wondering if there is any way to override this header within the spark application code itself?

For reference, here is the relevant readStream block from the PySpark app. Notice the region is explicitly specified:

    spark = SparkSession \
        .builder \
        .appName(appName) \
        .getOrCreate()

    kinesis = spark.readStream \
        .format("kinesis") \
        .option("region", "us-east-1") \
        .option("streamName", stream_name) \
        .option("endpointUrl", "https://kinesis-ae1.hdw.r53.deap.tv") \
        .option("awsAccessKeyId", awsAccessKeyId) \
        .option("awsSecretKey", awsSecretKey) \
        .option("startingposition", "LATEST") \
        .load()

Just to reiterate. Setting the endpointUrloption to https://kinesis.us-east-1.amazonaws.com allows the application to work without issues, but this is not ideal from a performance standpoint.

Thanks for any insight!