apache / hudi

Upserts, Deletes And Incremental Processing on Big Data.
https://hudi.apache.org/
Apache License 2.0
5.32k stars 2.42k forks source link

[SUPPORT] Remote connection issue while testing locally Apache Hudi with Glue Image and LocalStack #8691

Open danfran opened 1 year ago

danfran commented 1 year ago

Tips before filing an issue

Describe the problem you faced

I am trying to run some tests in Docker using the image amazon/aws-glue-libs:glue_libs_4.0.0_image_01 and localstack for AWS environment. No matter what, every time the tests run, Hudi tries to connect to the remote AWS instead of pointing to LocalStack. This is the configuration I am using at the moment:

packages = [
        '/home/glue_user/spark/jars/spark-avro_2.12-3.3.0-amzn-1.jar',
        '/home/glue_user/aws-glue-libs/datalake-connectors/hudi-0.12.1/hudi-spark3-bundle_2.12-0.12.1.jar',
        '/home/glue_user/aws-glue-libs/jars/aws-java-sdk-1.12.128.jar',
        '/home/glue_user/aws-glue-libs/jars/aws-java-sdk-glue-1.12.128.jar',
        '/home/glue_user/spark/jars/hadoop-aws-3.3.3-amzn-0.jar',
    ]

    conf = SparkConf() \
        .set('spark.jars', ','.join(packages))\
        .set('spark.serializer', 'org.apache.spark.serializer.KryoSerializer')\
        .set('spark.sql.catalog.spark_catalog', 'org.apache.spark.sql.hudi.catalog.HoodieCatalog')\
        .set('spark.sql.extensions', 'org.apache.spark.sql.hudi.HoodieSparkSessionExtension')

    spark_context = SparkContext(conf=conf)
    glue_context = GlueContext(spark_context)
    spark_session = glue_context.spark_session

    # HUDI S3 ACCESS
    spark_session.conf.set('fs.defaultFS', 's3://mybucket')
    spark_session.conf.set('fs.s3.awsAccessKeyId', 'test')
    spark_session.conf.set('fs.s3.awsSecretAccessKey', 'test')
    spark_session.conf.set('fs.s3a.awsAccessKeyId', 'test')
    spark_session.conf.set('fs.s3a.awsSecretAccessKey', 'test')
    spark_session.conf.set('fs.s3a.endpoint', 'http://localstack:4566')
    spark_session.conf.set('fs.s3a.connection.ssl.enabled', 'false')
    spark_session.conf.set('fs.s3a.path.style.access', 'true')
    spark_session.conf.set('fs.s3a.signing-algorithm', 'S3SignerType')
    spark_session.conf.set('spark.sql.legacy.setCommandRejectsSparkCoreConfs', 'false')

    # SPARK CONF
    spark_session.conf.set('spark.sql.shuffle.partitions', '2')
    spark_session.conf.set('spark.sql.crossJoin.enabled', 'true')

To Reproduce

Steps to reproduce the behavior:

Expected behavior

How can I make it to point to my local environment (http://localstack:4566) instead of AWS remote?

Environment Description

Additional context

Add any other context about the problem here.

Stacktrace

Add the stacktrace of the error.

An error occurred while calling o1719.save.
: java.nio.file.AccessDeniedException: s3://mybucket/myzone/location/.hoodie: getFileStatus on s3://mybucket/myzone/location/.hoodie: com.amazonaws.services.s3.model.AmazonS3Exception: Forbidden (Service: Amazon S3; Status Code: 403; Error Code: 403 Forbidden; Request ID: B1ZJ8JDPY2HX514F; S3 Extended Request ID: rzBqoLQxJb4PSKNW+uCbyVCbqYtpCB0aFHvX7JWTCDJ/PTfQdgESAkOzxWR6aPua8OhuEcajIM8=; Proxy: null), S3 Extended Request ID: rzBqoLQxJb4PSKNW+uCbyVCbqYtpCB0aFHvX7JUTVIJ/PTfQdgEHNkOzxWR6aPua8OhuEcajIM8=:403 Forbidden
ad1happy2go commented 1 year ago

@danfran It looks like the S3 access issue. I noticed you have set fs.s3a.endpoint but not fs.s3.endpoint. Any reason for the same?

Can you provide the entire stack trace. That will help us to know if its using the S3FileSystem class or not.

danfran commented 1 year ago

@ad1happy2go the reason is that I tried different configurations included the one reported here https://hudi.apache.org/docs/s3_hoodie/ (which actually was the first).

If I use this (s3://):

    spark_session.conf.set('fs.defaultFS', 's3://' + loader_params.POLEV2_OUTPUT_BUCKET_ARG)
    spark_session.conf.set('fs.s3.awsAccessKeyId', 'test')
    spark_session.conf.set('fs.s3.awsSecretAccessKey', 'test')
    spark_session.conf.set('fs.s3.endpoint', 'http://localstack:4566')
    spark_session.conf.set('fs.s3.connection.ssl.enabled', 'false')
    spark_session.conf.set('fs.s3.path.style.access', 'true')
    spark_session.conf.set('fs.s3.signing-algorithm', 'S3SignerType')
    spark_session.conf.set('spark.sql.legacy.setCommandRejectsSparkCoreConfs', 'false')

I get:

 23/05/17 16:07:42 WARN MetricsConfig: Cannot locate configuration: tried hadoop-metrics2-s3a-file-system.properties,hadoop-metrics2.properties
23/05/17 16:07:45 WARN FileStreamSink: Assume no metadata directory. Error while looking for metadata directory in the path: s3://mybucket/myfolder.
java.nio.file.AccessDeniedException: s3://mybucket/myfolder: getFileStatus on s3://mybucket/myfolder: com.amazonaws.services.s3.model.AmazonS3Exception: The AWS Access Key Id you provided does not exist in our records. (Service: Amazon S3; Status Code: 403; Error Code: InvalidAccessKeyId; Request ID: M9D08JH8VYX80XRA; S3 Extended Request ID: IxbO6iepeXUPli2Pa3JokElqAnsBnecjuat/+OEaTQiqcpuFcyyDlvioju7ip6uvVmcxoWAP/zY=; Proxy: null), S3 Extended Request ID: IxbO6iepeXUPli2Pa3JokElqAnsBnecjuat/+OEaTQiqcpuFcyyDlvioju7ip6uvVmcxoWAP/zY=:InvalidAccessKeyId
        at org.apache.hadoop.fs.s3a.S3AUtils.translateException(S3AUtils.java:255)
        at org.apache.hadoop.fs.s3a.S3AUtils.translateException(S3AUtils.java:175)
        at org.apache.hadoop.fs.s3a.S3AFileSystem.s3GetFileStatus(S3AFileSystem.java:3858)
        at org.apache.hadoop.fs.s3a.S3AFileSystem.innerGetFileStatus(S3AFileSystem.java:3688)
        at org.apache.hadoop.fs.s3a.S3AFileSystem.lambda$isDirectory$35(S3AFileSystem.java:4724)
        at org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.lambda$trackDurationOfOperation$5(IOStatisticsBinding.java:499)
        at org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.trackDuration(IOStatisticsBinding.java:444)
        at org.apache.hadoop.fs.s3a.S3AFileSystem.trackDurationAndSpan(S3AFileSystem.java:2337)
        at org.apache.hadoop.fs.s3a.S3AFileSystem.trackDurationAndSpan(S3AFileSystem.java:2356)
        at org.apache.hadoop.fs.s3a.S3AFileSystem.isDirectory(S3AFileSystem.java:4722)
        at org.apache.spark.sql.execution.streaming.FileStreamSink$.hasMetadata(FileStreamSink.scala:54)
        at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:370)
        at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:228)
        at org.apache.spark.sql.DataFrameReader.$anonfun$load$2(DataFrameReader.scala:210)
        at scala.Option.getOrElse(Option.scala:189)
        at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:210)
        at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:185)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
        at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
        at py4j.Gateway.invoke(Gateway.java:282)
        at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
        at py4j.commands.CallCommand.execute(CallCommand.java:79)
        at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
        at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
        at java.lang.Thread.run(Thread.java:750)
Caused by: com.amazonaws.services.s3.model.AmazonS3Exception: The AWS Access Key Id you provided does not exist in our records. (Service: Amazon S3; Status Code: 403; Error Code: InvalidAccessKeyId; Request ID: M9D08JH8VYX80XRA; S3 Extended Request ID: IxbO6iepeXUPli2Pa3JokElqAnsBnecjuat/+OEaTQiqcpuFcyyDlvioju7ip6uvVmcxoWAP/zY=; Proxy: null), S3 Extended Request ID: IxbO6iepeXUPli2Pa3JokElqAnsBnecjuat/+OEaTQiqcpuFcyyDlvioju7ip6uvVmcxoWAP/zY=
        at com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleErrorResponse(AmazonHttpClient.java:1862)
        at com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleServiceErrorResponse(AmazonHttpClient.java:1415)
        at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1384)
        at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1154)
        at com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:811)
        at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:779)
        at com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:753)
        at com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:713)
        at com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:695)
        at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:559)
        at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:539)
        at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:5453)
        at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:5400)
        at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:5394)
        at com.amazonaws.services.s3.AmazonS3Client.listObjectsV2(AmazonS3Client.java:971)
        at org.apache.hadoop.fs.s3a.S3AFileSystem.lambda$listObjects$11(S3AFileSystem.java:2595)
        at org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.lambda$trackDurationOfOperation$5(IOStatisticsBinding.java:499)
        at org.apache.hadoop.fs.s3a.Invoker.retryUntranslated(Invoker.java:414)
        at org.apache.hadoop.fs.s3a.Invoker.retryUntranslated(Invoker.java:377)
        at org.apache.hadoop.fs.s3a.S3AFileSystem.listObjects(S3AFileSystem.java:2586)
        at org.apache.hadoop.fs.s3a.S3AFileSystem.s3GetFileStatus(S3AFileSystem.java:3832)
        ... 26 more

with s3a conf:

    spark_session.conf.set('fs.defaultFS', 's3a://' + loader_params.POLEV2_OUTPUT_BUCKET_ARG)
    spark_session.conf.set('fs.s3a.awsAccessKeyId', 'test')
    spark_session.conf.set('fs.s3a.awsSecretAccessKey', 'test')
    spark_session.conf.set('fs.s3a.endpoint', 'http://localstack:4566')
    spark_session.conf.set('fs.s3a.connection.ssl.enabled', 'false')
    spark_session.conf.set('fs.s3a.path.style.access', 'true')
    spark_session.conf.set('fs.s3a.signing-algorithm', 'S3SignerType')
    spark_session.conf.set('spark.sql.legacy.setCommandRejectsSparkCoreConfs', 'false')

I get:

An error occurred while calling o453.save.
: java.nio.file.AccessDeniedException: s3://mybucket/myfolder/.hoodie: getFileStatus on s3://mybucket/myfolder/.hoodie: com.amazonaws.services.s3.model.AmazonS3Exception: Forbidden (Service: Amazon S3; Status Code: 403; Error Code: 403 Forbidden; Request ID: ZXBPV37ZW83PVFZD; S3 Extended Request ID: in5YLZn3rlRFngTEUn027bffHZ1NIUOl2SyROsffWx/mNPXkJwmrofm305jVBHVuanVNHr9GK2Q=; Proxy: null), S3 Extended Request ID: in5YLZn3rlRFngTEUn027bffHZ1NIUOl2SyROsffWx/mNPXkJwmrofm305jVBHVuanVNHr9GK2Q=:403 Forbidden
        at org.apache.hadoop.fs.s3a.S3AUtils.translateException(S3AUtils.java:255)
        at org.apache.hadoop.fs.s3a.S3AUtils.translateException(S3AUtils.java:175)
        at org.apache.hadoop.fs.s3a.S3AFileSystem.s3GetFileStatus(S3AFileSystem.java:3796)
        at org.apache.hadoop.fs.s3a.S3AFileSystem.innerGetFileStatus(S3AFileSystem.java:3688)
        at org.apache.hadoop.fs.s3a.S3AFileSystem.lambda$exists$34(S3AFileSystem.java:4703)
        at org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.lambda$trackDurationOfOperation$5(IOStatisticsBinding.java:499)
        at org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.trackDuration(IOStatisticsBinding.java:444)
        at org.apache.hadoop.fs.s3a.S3AFileSystem.trackDurationAndSpan(S3AFileSystem.java:2337)
        at org.apache.hadoop.fs.s3a.S3AFileSystem.trackDurationAndSpan(S3AFileSystem.java:2356)
        at org.apache.hadoop.fs.s3a.S3AFileSystem.exists(S3AFileSystem.java:4701)
        at org.apache.hudi.HoodieSparkSqlWriter$.write(HoodieSparkSqlWriter.scala:86)
        at org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:144)
        at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:45)
        at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:75)
        at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:73)
        at org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:84)
        at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:103)
        at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107)
        at org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:224)
        at org.apache.spark.sql.execution.SQLExecution$.executeQuery$1(SQLExecution.scala:114)
        at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$7(SQLExecution.scala:139)
        at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107)
        at org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:224)
        at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:139)
        at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:245)
        at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:138)
        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
        at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:68)
        at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:100)
        at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:96)
        at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:615)
        at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:177)
        at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:615)
        at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:30)
        at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
        at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
        at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
        at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
        at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:591)
        at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:96)
        at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:83)
        at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:81)
        at org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:124)
        at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:860)
        at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:390)
        at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:363)
        at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:239)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
        at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
        at py4j.Gateway.invoke(Gateway.java:282)
        at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
        at py4j.commands.CallCommand.execute(CallCommand.java:79)
        at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
        at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
        at java.lang.Thread.run(Thread.java:750)
Caused by: com.amazonaws.services.s3.model.AmazonS3Exception: Forbidden (Service: Amazon S3; Status Code: 403; Error Code: 403 Forbidden; Request ID: ZXBPV37ZW83PVFZD; S3 Extended Request ID: in5YLZn3rlRFngTEUn027bffHZ1NIUOl2SyROsffWx/mNPXkJwmrofm305jVBHVuanVNHr9GK2Q=; Proxy: null), S3 Extended Request ID: in5YLZn3rlRFngTEUn027bffHZ1NIUOl2SyROsffWx/mNPXkJwmrofm305jVBHVuanVNHr9GK2Q=
        at com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleErrorResponse(AmazonHttpClient.java:1862)
        at com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleServiceErrorResponse(AmazonHttpClient.java:1415)
        at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1384)
        at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1154)
        at com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:811)
        at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:779)
        at com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:753)
        at com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:713)
        at com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:695)
        at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:559)
        at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:539)
        at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:5453)
        at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:5400)
        at com.amazonaws.services.s3.AmazonS3Client.getObjectMetadata(AmazonS3Client.java:1372)
        at org.apache.hadoop.fs.s3a.S3AFileSystem.lambda$getObjectMetadata$10(S3AFileSystem.java:2545)
        at org.apache.hadoop.fs.s3a.Invoker.retryUntranslated(Invoker.java:414)
        at org.apache.hadoop.fs.s3a.Invoker.retryUntranslated(Invoker.java:377)
        at org.apache.hadoop.fs.s3a.S3AFileSystem.getObjectMetadata(S3AFileSystem.java:2533)
        at org.apache.hadoop.fs.s3a.S3AFileSystem.getObjectMetadata(S3AFileSystem.java:2513)
        at org.apache.hadoop.fs.s3a.S3AFileSystem.s3GetFileStatus(S3AFileSystem.java:3776)
        ... 56 more

I have tried to extend the params label like spark.hadoop.fs.s3a.endpoint but nothing change.

ad1happy2go commented 1 year ago

@danfran Were you able to resolve this issue. If yes, can you let us know the resolution.

danfran commented 1 year ago

unfortunately it's not working yet.

cannon-tp commented 2 months ago

Hey, @danfran I think setting hadoop properties in spark conf could be a problem. I faced the same, resolved it using the following code.

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext, SparkConf
from awsglue.context import GlueContext
from awsglue.job import Job
from awsglue.dynamicframe import DynamicFrame

conf = (SparkConf().setAppName("hudi-1")
        .set("spark.hadoop.fs.s3a.endpoint", "http://localstack:4566")
        .set("spark.hadoop.fs.s3a.connection.ssl.enabled", "false")
        .set("spark.hadoop.fs.s3a.multipart.size", "104857600")
        .set("spark.hadoop.fs.s3a.access.key", "test")
        .set("spark.hadoop.fs.s3a.secret.key", "test")
        .set("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
        .set("spark.hadoop.fs.s3a.path.style.access", "true")
        .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
        .set("spark.jars.packages", "org.apache.hudi:hudi-spark3.3-bundle_2.12:0.15.0,org.apache.hadoop:hadoop-aws:3.3.3")
        .set("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.hudi.catalog.HoodieCatalog")
        .set("spark.sql.extensions", "org.apache.spark.sql.hudi.HoodieSparkSessionExtension")
        .set("spark.sql.legacy.timeParserPolicy", "LEGACY")
       )

sc = SparkContext(conf=conf)
glueContext = GlueContext(sc)
spark = glueContext.spark_session