Closed PatrykMilewski closed 1 year ago
Thank you for your friendly response and sharing good information. I'm going to try to make some examples about processing CDC data with Iceberg and Glue as soon as I can.
Hi Patryk Milewski
I have uploaded 2 CDK projects to show how to process CDC with Amazon Glue Streaming and store it into Amazon S3 in Apache Iceberg format.
(1) https://github.com/aws-samples/transactional-datalake-using-apache-iceberg-on-aws-glue (2) https://github.com/aws-samples/aws-glue-streaming-etl-with-apache-iceberg
Of course, you will be able to find similar examples at glue
directory in this repository.
I hope these examples is helpful for you.
Hey! Thanks it looks super cool! To be honest I've just finished implementing almost the same thing (ETL not streaming) (btw. aws-glue-streaming-etl-with-apache-iceberg is still private repo I think, GH shows 404 after going to the link)
A few things that I've noticed for ETL:
Is this one: "--extra-jars": "s3://aws-glue-assets-123456789012-us-east-1/extra-jars/aws-sdk-java-2.17.224.jar"
needed for sure? I've seen that they mention that in the Iceberg's docs, but at the same time, isn't it already packaged in the Glue runtime? I'm asking because S3 integration with conf.set(f"spark.sql.catalog.{CATALOG}.io-impl", "org.apache.iceberg.aws.s3.S3FileIO")
works fine for me without that jar
Did you tried using engine_version='3.4.7',
for DMS? I had really a lot of problems with that, because it requires to create VPC endpoints and I couldn't get it to work with Secrets Manager. In the end I had to give up on direct Secrets Manager integration (I see that you also didn't use direct Secrets Manager integration). In my case latest DMS version was necessary, because previous one didn't supported Postgres 14 and Aurora Serverless v2
In DMS replication task table mappings, you put "schema-name": database_name,
for filtering stuff from input db. I had some issues with that, because for example in Postgres it's not database name, but schema name, not sure about MySQL. Maybe it would be worth to add a comment there, that it should be schema name for Postgres? Or maybe it's also schema name in MySQL? You could also add a comment for table names, that it can be %
if somebody wants to get all of tables.
About Glue job config:
worker_type="G.1X" # ['Standard' | 'G.1X' | 'G.2X' | 'G.025x']
G.025x is not available for ETL jobs, just for streamingDid you had issues with AWS DMS CDC? Currently with that config for replication task:
// https://docs.aws.amazon.com/dms/latest/userguide/CHAP_Tasks.CustomizingTasks.TaskSettings.html
replicationTaskSettings: JSON.stringify({
ChangeProcessingTuning: {
BatchApplyTimeoutMax: 5,
MinTransactionSize: 1,
BatchApplyMemoryLimit: 250,
},
TargetMetadata: {
BatchApplyEnabled: true,
},
}),
I'm not able to get a new CDC file on the output S3 bucket, when doing a few small updates in the database. Previously I was trying to BatchApplyEnabled: false and rest of options being default, with no luck
Sorry for that long message, but just wanted to share my feedback as a somebody, that was building datalake basing on your examples 😄 So maybe it will be easier for others to use them and not follow my mistakes/misunderstandings
I fixed the private repository link, you can access this link: https://github.com/aws-samples/aws-glue-streaming-etl-with-apache-iceberg
Q1) "--extra-jars": "s3://aws-glue-assets-123456789012-us-east-1/extra-jars/aws-sdk-java-2.17.224.jar"
needed for sure?
A) When using Glue Streaming, I have met such a wierd exception:
An error occurred while calling o135.start.
: java.lang.NoSuchMethodError: software.amazon.awssdk.utils.SystemSetting.getStringValueFromEnvironmentVariable(Ljava/lang/String;)Ljava/util/Optional;
at software.amazon.awssdk.awscore.interceptor.TraceIdExecutionInterceptor.lambdaFunctionNameEnvironmentVariable(TraceIdExecutionInterceptor.java:63)
at software.amazon.awssdk.awscore.interceptor.TraceIdExecutionInterceptor.modifyHttpRequest(TraceIdExecutionInterceptor.java:40)
at software.amazon.awssdk.core.interceptor.ExecutionInterceptorChain.modifyHttpRequestAndHttpContent(ExecutionInterceptorChain.java:90)
at software.amazon.awssdk.core.internal.handler.BaseClientHandler.runModifyHttpRequestAndHttpContentInterceptors(BaseClientHandler.java:151)
...
It was because amazon.awssdk.utils.SystemSetting.getStringValueFromEnvironmentVariable
is implemented not AWS SDK Java version 1, but AWS SDK Java version 2 (aws-sdk-java-v2).
So, I figured out adding the extra jar file can work around this problem when running Glue Streaming Job.
Q2) Did you tried using engine_version='3.4.7'
, for DMS?
A) Unfortunately, no.
Q3) Your suggestions?
In DMS replication task table mappings, you put "schema-name": database_name, for filtering stuff from input db. I had some issues with that, because for example in Postgres it's not database name, but schema name, not sure about MySQL. Maybe it would be worth to add a comment there, that it should be schema name for Postgres? Or maybe it's also schema name in MySQL? You could also add a comment for table names, that it can be % if somebody wants to get all of tables. A) Thanks for good suggestion. I aggree with you if putting some comment will be more helpful to others.
Q4) shouldn't it use Glue 4.0? A) As of the time I have made these example codes, Glue Streaming can not support Glue 4.0 version.
Q5) About Glue Job config: I'm having bookmarks switch enabled which works great with DMS CDC, as you don't need to manage those files and Glue knows which one was already imported A) Glue Streaming Job can not use Job bookmark feature. Glue Job bookmark is only enabled in Glue Batch Job.
Q6) About Glue Job config: worker_type="G.1X" # ['Standard' | 'G.1X' | 'G.2X' | 'G.025x']
G.025x is not available for ETL jobs, just for streaming
A) Actually, I didn't take too much care of worker_type
. I just chose not too small but also not too big one. I think you had better check the official developement guide.
Q7) Did you had issues with AWS DMS CDC? Currently with that config for replication task:
// https://docs.aws.amazon.com/dms/latest/userguide/CHAP_Tasks.CustomizingTasks.TaskSettings.html
replicationTaskSettings: JSON.stringify({
ChangeProcessingTuning: {
BatchApplyTimeoutMax: 5,
MinTransactionSize: 1,
BatchApplyMemoryLimit: 250,
},
TargetMetadata: {
BatchApplyEnabled: true,
},
}),
A) I have no idea about this. If I try to find out the root cause, I will check (1) if RDS binlog is correctly replicated (2) if this problem is related with DMS Target. How about compare two DMS Targets - S3 vs. Kinesis?
If you convert DMS output to Apache Iceberg format in batch mode (for example, running a hourly Glue ETL job), this example, aws-dms-cdc-data-pipeline will give you some insight. I designed the example architecture assuming that CDC data is stored into S3 in append-only mode. So, if you run Glue (Batch) ETL Job to merge CDC data into the Iceberg table, you can get what you want with the example architecture.
Hey just wanted to say thanks for the Glue example with Iceberg, I think it's the most helpful example over the internet and the complexity and confusion over Glue with Iceberg doesn't help with that!
Maybe you would be able to write an article for AWS Big Data blog about AWS Glue ETL with Iceberg and CDC with ready to deploy example stack? I would be super nice, as there is really a lot of examples that either doesn't work, or doesn't cover CDC.
And btw. this example can be now simplified in a few ways:
IF NOT EXISTS
:createTableSqlQuery = f""" CREATE TABLE IF NOT EXISTS {CATALOG}.{DATABASE}.{TABLE_NAME} ( emp_no bigint, name string, department string, city string, salary int, m_time timestamp, last_applied_date timestamp) PARTITIONED BY (
department
) LOCATION '{ICEBERG_S3_PATH}' TBLPROPERTIES ( 'table_type'='iceberg' ); """ sparkSqlQuery( glueContext, query=createTableSqlQuery, mapping={}, transformation_ctx="createTableSqlQuery", )createDatabaseSqlQuery = f""" CREATE DATABASE IF NOT EXISTS {DATABASE}; """ sparkSqlQuery( glueContext, query=createDatabaseSqlQuery, mapping={}, transformation_ctx="createDatabaseSqlQuery", )