dlt-hub / dlt

data load tool (dlt) is an open source Python library that makes data loading easy 🛠️
https://dlthub.com/docs
Apache License 2.0
2.28k stars 148 forks source link

AWS S3 and Glue Catalog / Athena integration #172

Closed nicor88 closed 10 months ago

nicor88 commented 1 year ago

AWS Athena integration

We want to create a new Athena destination which will use the filesystem staging destination and create Athena / Glue Catalog definitions for the data contained therein. Since we know the schema of the files we upload to the destination bucket, we can use the HiveQL interface of Athena to create and update tables. This tables will be available in the glue catalog also. For now there is a very hackt Prototype on the d#/Athena_poc branch that demonstrates how some of this works, there is a lot of stuff missing there but it is a start.

Tasks first version

Tasks second version

Tests

Other thoughts

Original request:

What

Nowadays warehouses like Bigquery/Redshift/Snowflake are not the only used destinations. Often companies used S3 (or objects storage as destinations). It will be great to support S3 as destination - JSON format is okish (ideally parquet is supported).

When adding data to s3 destinations it will be great to register the dataset to Glue catalog , to achieve that conside the usage of:

Something similar can be done for snowflake - after ingesting the data in s3 the dataset can be registered as external table in snowflake.

Notes

It's generally a good idea to use S3 as destination also when using Snowflake, because pure S3 storage is cheaper than snowflake storage - in this case it's possible to use snowflake eternal tables in combo with dbt to pick only what is needed and stage transformed data - ideally this is done incrementally.

rudolfix commented 1 year ago

This is clearly useful!

  1. we are planning to add s3 (and other bucket storage) support still in March. this will be soon ticketized here
  2. registration in catalogue services like AWS Glue... sounds like a powerful tool and good idea! all depends when we can find more contributors :)

btw. (2) is on our radar for some time. you could build such pipeline with dlt + duckdb right now. we were experimenting with something like this:

json -> dlt -> duckdb -> dbt-duckb

dbt-duckdb can materialize to parquet on s3 bucket and register/update in AWS Glue

@nicor88 are you building something similar(json - s3 - glue)? if so and if you want to use dlt we can assist you in some PoC.

nicor88 commented 1 year ago

@rudolfix We have all the above already in place (except snowflake external tables). We do often extract data from APIs (JSON) and persist to our bronze layer using AWS data wrangler, then we perform the remaining transformation using dbt-athena.

if so and if you want to use dlt we can assist you in some PoC.

I would say we are not interested for now, my interest in the above is just to help you to build an amazing product that cover more uses cases/setups :)

rudolfix commented 1 year ago

amazing :) to do the above we need a new destination type which is a bucket storage and where you can pick the file format (ie. parquet/json/csv) and catalogue service to register. this looks like a task for our core team. still you could help us a lot

  1. requirements how the whole thing should work (ie. what is the layout of the files in bronze layer, are they moved or deleted for any reason etc.)
  2. if you have any code that you can contribute to register/update AWS Glue (or other catalogs) - that will speed us up

I'll keep you posted around this topic

nicor88 commented 1 year ago

@rudolfix I totally forgot to answer this.

  1. It would be amazing if you can add an S3-based object storage destination support (make sure to support passing s3 endpoints to use systems that are s3 compatible like Minio, therefore vendor agnostic). I often see this sutup in action even in small companies, as it's the easier that you can start with for raw data ingestion.

  2. Currently to register a table definition to glue catalog, I use aws-sdk-pandas, here one of the primary method used to create glue tables - important to remark: when a glue table is created then can be queried from external systems that have integration with glue: Athena, Trino, Spark, Dremio (just to mention few)

lukas-gust commented 1 year ago

A feature like this would make dlt fit in perfectly into our architecture. At the moment we're trying to figure out how to use a glue crawler to catalog the data landed in s3. So far it's not playing well because of the file name format in the filesystem destination. Will probably have to implement our own table creation process.

As an added bonus allowing the definition of partitions would be excellent.

rudolfix commented 1 year ago

@lukas-gust we plan to start working on this next week. lack of filesystem destination was the blocker and now we are ready. it would be very helpful to get any requirements for file name format for filesystem from you. this is something that could be configured - currently we have a first attempt on it and we need feedback.

do you mean partitions in parquet or in table catalogue? both are possible, in the first case we would need to copy a folder with partitioned parquet

rudolfix commented 1 year ago

@nicor88 not sure if you still are interested in this, but if you could share code that creates and updates AWS Glue catalogue. it looks trivial to add parquet files as external tables in Redshift and Snowflake but I lack experience on how you manage updates to data and schemas in external tables. I'll talk with @adrianbr but if you have any code to share that would be fantastic

edit: I see some examples for glue from April. I will dive in. thx :)

lukas-gust commented 1 year ago

@rudolfix As far as I know for the glue catalog integration it doesn't matter too much what the files look like, but it might simplify implementation if a table was represented in a folder/key e.g. /datalake/<table_name>/<load_id>.<file_id>.<file_format> or something. Then the glue catalog could just reference the table directory and infer that all the files in there belong to the table when it's read. As it stands now I think there would a be a challenge using, but I'm not super familiar, yet, with using awswrangler or the Glue Table API. Most of my experience is with using Athena DDL statements.

By partitions I mean native to s3 and the glue table and/or athena. We could also go down the open table route which would be neat.

lukas-gust commented 1 year ago

@rudolfix We leverage external schemas in Redshift. Meaning if you can handle the schema drift in the glue catalog then the external schema will just pick up the new external tables automatically without having to write create external table statements. From the redshift docs:

If your external table is defined in AWS Glue, Athena, or a Hive metastore, you first create an external schema that references the external database. Then you can reference the external table in your SELECT statement by prefixing the table name with the schema name, without needing to create the table in Amazon Redshift. For more information, see Creating external schemas for Amazon Redshift Spectrum.

So basically you'd catalog the table in Glue with the same mechanism and add the external schema to redshift. Though they do recommend you manage external dbs/schemas/tables through redshift.

Does that make sense?

rudolfix commented 1 year ago

So basically you'd catalog the table in Glue with the same mechanism and add the external schema to redshift. Though they do recommend you manage external dbs/schemas/tables through redshift.

Does that make sense? yes and adding AWS Glue as a destination does not look overly complicated ie. similar to what dbt-duckb does here (https://github.com/jwills/dbt-duckdb/blob/master/dbt/adapters/duckdb/plugins/glue.py - mentioning this because I know this code, it may not be the best example :)

going back to external redshift tables - it looks like that if we just add EXTERNAL keyword and location of the files to our existing Redshift destination - it will just work. so we'll investigate that as well

rudolfix commented 1 year ago

@rudolfix As far as I know for the glue catalog integration it doesn't matter too much what the files look like, but it might simplify implementation if a table was represented in a folder/key e.g. /datalake/<table_name>/<load_id>.<file_id>.<file_format> or something. Then the glue catalog could just reference the table directory and infer that all the files in there belong to the table when it's read. As it stands now I think there would a be a challenge using, but I'm not super familiar, yet, with using awswrangler or the Glue Table API. Most of my experience is with using Athena DDL statements.

OK - we can let people choose their preferred layout by using format specifiers with placeholders like "{table_name}/{load_id}..." so users can define their layout. maybe with some more specifiers like current day etc. I'll work on a ticket with requirements and I'll ping you to get feedback: https://github.com/dlt-hub/dlt/issues/510

By partitions I mean native to s3 and the glue table and/or athena. We could also go down the open table route which would be neat.

what do you mean by open table route? that we maintain ie iceberg hive locally or in a bucket and maintain the schema ourselves? there my knowledge is quite limited

lukas-gust commented 1 year ago

OK - we can let people choose their preferred layout by using format specifiers with placeholders like "{table_name}/{load_id}..." so users can define their layout. maybe with some more specifiers like current day etc. I'll work on a ticket with requirements and I'll ping you to get feedback: https://github.com/dlt-hub/dlt/issues/510

Very nice! I think this will add a healthy amount of flexibility with the filesystem destination.

RE: Redshift external schema/tables. How are you/we planning to handle the need for an IAM role? Or is that handled already by the Redshift destination? I.e. the external queries need policies in place to allow the Redshift user/service account to read the data in the S3 buckets.

what do you mean by open table route? that we maintain ie iceberg hive locally or in a bucket and maintain the schema ourselves? there my knowledge is quite limited

RE: partitions. I worded that a bit weird, they were meant to be separate ideas. By mentioning open table formats I mean supporting (where it's possible) hive, iceberg, hudi, and delta etc. More or less allowing the 'merge' write disposition to be possible for a the filesystem destination OR is it a new destination at this point?

I'm only just learning more about Iceberg in particular, but I've used Delta tables before. Are there specific questions about schema evolution and management that you can ask so that I can help clarify?

rudolfix commented 1 year ago

re. Redshift: you can provide an iam role or if not provided dlt will forward the AWS credentials used by staging to copy files to s3 (https://dlthub.com/docs/dlt-ecosystem/destinations/redshift#staging-support)

RE: partitions. I worded that a bit weird, they were meant to be separate ideas. By mentioning open table formats I mean supporting (where it's possible) hive, iceberg, hudi, and delta etc. More or less allowing the 'merge' write disposition to be possible for a the filesystem destination OR is it a new destination at this point?

This is most probably a new destination. To say more I'd need to do a deeper research. We'd need a query engine somewhere to interact with those tables ie. https://aws.amazon.com/about-aws/whats-new/2023/07/amazon-redshift-querying-apache-iceberg-tables/ or https://docs.aws.amazon.com/glue/latest/dg/aws-glue-programming-etl-format-iceberg.html (this just looks like an option when creating tables)

I'm only just learning more about Iceberg in particular, but I've used Delta tables before. Are there specific questions about schema evolution and management that you can ask so that I can help clarify?

We are prioritizing tasks today. I'll get back to you with more questions.

lukas-gust commented 1 year ago

One thing to clarify what I'm referring to regarding Redshift is the following quote from https://docs.aws.amazon.com/redshift/latest/dg/r_CREATE_EXTERNAL_TABLE.html

In addition to external tables created using the CREATE EXTERNAL TABLE command, Amazon Redshift can reference external tables defined in an AWS Glue or AWS Lake Formation catalog or an Apache Hive metastore. Use the CREATE EXTERNAL SCHEMA command to register an external database defined in the external catalog and make the external tables available for use in Amazon Redshift. If the external table exists in an AWS Glue or AWS Lake Formation catalog or Hive metastore, you don't need to create the table using CREATE EXTERNAL TABLE. To view external tables, query the SVV_EXTERNAL_TABLES system view.

This may be a desired integration for some, essentially being able to "load" data into redshift via adding an external schema that is managed by say a glue crawler that crawls event files landed by a kinesis stream, or some other process outside of dlt. Thoughts?

We'd need a query engine somewhere to interact with those tables ie. https://aws.amazon.com/about-aws/whats-new/2023/07/amazon-redshift-querying-apache-iceberg-tables/ or https://docs.aws.amazon.com/glue/latest/dg/aws-glue-programming-etl-format-iceberg.html (this just looks like an option when creating tables)

Hence where, in the case of AWS, Athena could enter as query engines to manage the crud operations. https://docs.aws.amazon.com/athena/latest/ug/querying-iceberg-managing-tables.html

I don't imagine the destination would be something like 'iceberg'. It'd be more along the lines of 'athena' or 'presto'.

sh-rp commented 1 year ago

@lukas-gust, @nicor88 I have been reading the aws docs for Athena and Glue and to me it seems at this point (and please correct me if I am wrong, I have just started on this topic) that a s3 + Athena destination would make sense for us a this point. dlt knows the schema of the current load package and can use Athenas HiveQL DDL statements to create and update table schemas based on sets of s3 files. There is no need for the glue crawler to infer the schema and you can then query your tables via Athena and by extension also from Glue. Does what I write make sense to you or am I missing something? I will also think about how to add partitions. Most likely it seems to me at this point, that we could add a field on the resource to add the keys on which to partition on, but that would also mean extending the dlt schema with these partition keys, so probably a bigger project.

I think these table definitions created in Athena are also stored in the glue catalog anyway and should be available there without further configuration.

lukas-gust commented 1 year ago

@sh-rp You're 100% correct. It's also possible to use the Glue API or what @nicor88 mentioned with aws_wrangler library to create tables and such.

The glue crawler was meant to be an example, but it is not necessary and would always be in the hands of the implementation and not dlt. And if #510 is implemented faster that is what I will be utilizing in the short term. At the moment I'm planning my implementation around building my own destination the best I can.

Partitions is a nice to have IMO. The catch is that you can only add 100 physical partitions at a time in Athena so if there were more for say a full refresh or something you'd have to batch the partition add statements, which I have done by hand at one point. It also looks like if you enforced hive style partitions you would not have to do so, but would need to run a repair command, might have some control there. https://docs.aws.amazon.com/athena/latest/ug/partitions.html

Athena and Glue Catalog are synonymous if you add something via one it's available from the other. One gripe I have is that it's only a two-level namespace `..

sh-rp commented 1 year ago

@lukas-gust thanks for your detailed answer. After more reading it seems like ticket 510 is necessary in any case, as its seems Athena tables always reference a full folder and never a subset of items in a folder, so we might change our default filesystem storage output to have one "folder" per table anyway. Will keep you posted here or in slack about our further thoughts and progress on this.

sh-rp commented 1 year ago

@lukas-gust I have updated the tickets with my thoughts, feel free to let me know if there is stuff missing or anything else comes to mind.

sh-rp commented 1 year ago

@lukas-gust another question: Would there be any sense in writing all the data into iceberg tables? What I have in mind now is uploading parquet files to s3 and referencing folders with Athena/Glue Catalog. An alternative seems to be to create iceberg tables and updating those via the sql interface. I think this might have a few upsides with regards to merging datasets, but It seems to be that this approach might actually be quite slow in the end.

lukas-gust commented 1 year ago

TLDR; landing as JSON or Parquet is perfectly acceptable IMO. Iceberg would be a nice addition for merging. Partitioning is important to optimize cost and performance.

I think it's better to leave the extracted data closest to original form, this is a general approach I prefer and isn't the end all be all. I could see an implementation with Iceberg tables being helpful, but they're managed tables so Iceberg owns the files, whereas the default hive table is just a reference to a set of files.

I think the biggest benefit you get with loading via Iceberg is merging capability and maybe some partition mechanics. I'm not ultra familiar with iceberg yet. Not sure what slowness you might be referring too.

dbt-athena allows both, where merging updates is only possible via iceberg tables. https://github.com/dbt-athena/dbt-athena#iceberg. But I don't think it started there.