aws / aws-cdk

The AWS Cloud Development Kit is a framework for defining cloud infrastructure in code
https://aws.amazon.com/cdk
Apache License 2.0
11.65k stars 3.91k forks source link

Athena Create Table #3665

Open rhboyd opened 5 years ago

rhboyd commented 5 years ago

Then uses the AWS SDK Custom Resource on the Athena SDK to execute

querystring = """
  CREATE EXTERNAL TABLE IF NOT EXISTS mydb.table_name (
    `columns[0][0]` columns[0][1],
    `columns[1][0]` columns[1][1]
    etc...
  )
  ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe'
  WITH SERDEPROPERTIES (
    {data_format}
  ) LOCATION 's3://{bucket_name}/{path}'
  TBLPROPERTIES ('has_encrypted_data'='false');
"""
response = client.start_query_execution(
    QueryString=querystring,
    ClientRequestToken='string',
    QueryExecutionContext={
        'Database': 'string'
    },
    ResultConfiguration={
        'OutputLocation': 'string',
        'EncryptionConfiguration': {
            'EncryptionOption': 'SSE_S3'|'SSE_KMS'|'CSE_KMS',
            'KmsKey': 'string'
        }
    },
    WorkGroup='string'
)
rhboyd commented 5 years ago

Can also work for creating a database in Athena as well

beuleal commented 5 years ago

Any updates?

jonny-rimek commented 4 years ago

I'd love to see this as well, but it's a CFN limitation not CDK

rhboyd commented 4 years ago

This can definitely be addressed by CDK, I just don’t have the time to implement it. I’d be glad to give someone some pointers for how to get started if they wanted to pick it up.

PikeDev commented 4 years ago

This would definitely be an interesting implementation. @rhboyd how would the implementation deal with the user changing the table definition in code and then updating the stack? The IF NOT EXISTS part means the table will not be recreated. There is also no REPLACE mechanism that I am aware of. In a project I'm currently doing we just run DROP before the CREATE. Would that suffice here also or is it too destructive?

guilarteH commented 4 years ago

I'm still new to Athena, but I realised you can create a table with Glue There's new glue.Table() and even new glue.Database() https://docs.aws.amazon.com/cdk/api/latest/docs/aws-glue-readme.html

I am however struggling to make it work if my table has an S3 bucket location and the S3 bucket has encryption enabled. It works like a charm with unencrypted buckets.

I'm still investigating but I'm pretty sure this condition here is wrong and might need to report a bug: https://github.com/aws/aws-cdk/blob/master/packages/%40aws-cdk/aws-glue/lib/table.ts#L363

jonny-rimek commented 4 years ago

very interesting, I always used to create it through the athena setup wizard and save the SQL somewhere.

I have a bucket with a bunch of parquet files, all the same format. sounds like it would be a fit.

@guilarteH would you mind sharing the example that works for non encrypted buckets?

guilarteH commented 4 years ago

@jonny-rimek I created a simplified example, hope this works for you. Change the database region to yours, I have eu-central-1 there.

import s3 = require("@aws-cdk/aws-s3");
import glue = require("@aws-cdk/aws-glue");

...

let database = glue.Database.fromDatabaseArn(this, "athenaDefaultDb", "arn:aws:athena:eu-central-1::workgroup/default");

const bucket = s3.Bucket.fromBucketAttributes(this, "ImportedBucket", {
    bucketArn: "arn:aws:s3:::events-bucket"
});

new glue.Table(this, "athenaTable", {
    bucket: bucket,
    columns: [
        {name: "string_column", type: glue.Schema.STRING},
        {name: "timestamp_column", type: glue.Schema.TIMESTAMP},
        {name: "integer_columns", type: glue.Schema.INTEGER},
        {name: "decimal_columns", type: glue.Schema.decimal(15, 4)}
    ],
    compressed: true,
    dataFormat: glue.DataFormat.Parquet,
    database: database,
    description: "My table description",
    partitionKeys: [{name: "partition_key", type: glue.Schema.STRING}],
    s3Prefix: "events/parquet/my_table_name/",
    tableName: "my_table_name_cdk",
});
shwetajoshi601 commented 3 years ago

I have CSV files that will be stored in s3 by some lambda function. I need to query these files using Athena.

I want to create a table with dynamic ID partitioning in the following format:

CREATE EXTERNAL TABLE IF NOT EXISTS resource_data_dyn_partitioned (
             col_id STRING,
             numeric_value DOUBLE,
             timestamp DATE
)
PARTITIONED BY (log_id STRING)
ROW FORMAT DELIMITED
      FIELDS TERMINATED BY ','
      LINES TERMINATED BY '\n'
LOCATION '<s3 folder ARN>'
TBLPROPERTIES ("skip.header.line.count"="1",       
               "projection.enabled" = "true",
               "projection.device_id.type" = "injected",
               "storage.location.template" = "<s3 folder arn>/${logger_id}"
 );

How can I do it with CDK? Do I need to create it using AWS Glue? - refer this link. If yes, could somebody share an example in TypeScript?

guilarteH commented 3 years ago

Hey @shwetajoshi601, here's a snipper from our project. You can do it with the glue constructors:

        this.database = new glue.Database(this, "SOME_NAME", {
            databaseName: "SOME_NAME"
        });

        this.table = new glue.Table(this, "athenaTable", {
            database: this.database,
            ...
            ...
            columns: [
                {name: "type", type: glue.Schema.STRING},
                {name: "classification", type: glue.Schema.STRING},
                {name: "timestamp", type: glue.Schema.TIMESTAMP}
            ],
            partitionKeys: [{name: "partition_key", type: glue.Schema.STRING}]
            });

Hope this helps

shwetajoshi601 commented 3 years ago

Hey @shwetajoshi601, here's a snipper from our project. You can do it with the glue constructors:

        this.database = new glue.Database(this, "SOME_NAME", {
            databaseName: "SOME_NAME"
        });

        this.table = new glue.Table(this, "athenaTable", {
            database: this.database,
            ...
            ...
            columns: [
                {name: "type", type: glue.Schema.STRING},
                {name: "classification", type: glue.Schema.STRING},
                {name: "timestamp", type: glue.Schema.TIMESTAMP}
            ],
            partitionKeys: [{name: "partition_key", type: glue.Schema.STRING}]
            });

Hope this helps

Thank you so much for your reply! I tried the above snippet, it was indeed helpful! It creates the database and the table. However, it does not do dynamic ID partitioning. I do not want to add the partition key in the table schema. I have folders in my s3 bucket which have the partition key as its name. All the files in the folders have the same schema. I need to be able to query with the log_id (thought of partitioning as I want athena to look for specific folders based on the log_id and not scan the entire bucket). But the above snippet partitions the table and also adds the partition key (log_id in my case) to the table schema (simple partitioning). I have specified the prefix as "${log_id}", but this does not work.

I am struggling to get dynamic ID partitioning working with CDK. There is very less description in the docs and less examples. :( Help would be appreciated!

guilarteH commented 3 years ago

Hi @shwetajoshi601, I think the problem is in your S3 folder name structure for the partition id. It can of course be dynamic, but it HAS TO follow the convention of being called log_id={log_id}... So your partition key would be called log_id in the table definition but the S3 "folder" would be that I mentioned.

In the case of my example, my S3 folders are called partition_key=20201105 for today.

It took me a while to understand this at first too

I.e. s3://bucket/partition_key=20201105/file.parquet

shwetajoshi601 commented 3 years ago

Thank you so much, @guilarteH, I tried what you mentioned. It does not work for me. I have tried dynamic ID partitioning manually from the console. It does not require you to name the folders in this format - log_id=value. Here is how I did it manually from the console: I have a bucket named "data" with folders named 151001, 151002.

data

I created a table using dynamic ID partitioning using the following query:

CREATE EXTERNAL TABLE IF NOT EXISTS my_table (
         data_column_id STRING,
         numeric_value DOUBLE,
         timestamp STRING
)
PARTITIONED BY (log_id STRING)
ROW FORMAT DELIMITED
      FIELDS TERMINATED BY ','
      LINES TERMINATED BY '\n'
LOCATION 's3://data/'
TBLPROPERTIES ("skip.header.line.count"="1",       
               "projection.enabled" = "true",
               "projection.log_id.type" = "injected",
               "storage.location.template" = "s3://data/${log_id}"
              );

Now, Query the table using a select query:

SELECT * FROM "mydatabase"."my_table" where log_id='151001';

This works and gives the following columns in the response:

  1. data_column_id
  2. numeric_value
  3. timestamp

Following are the table properties as displayed in the AWS Glue console:

table-props

Following is the schema of the table:

schema

Here is what I could add based on the classes and types mentioned in CDK:

const athenaTable = new glue.Table(this, 'ResourceDataTable', {
      bucket: resourceDataBucket,
      database: athenaDb,
      tableName: athenaTableName,
      columns: [
        { name: 'data_column_id', type: glue.Schema.STRING },
        { name: 'numeric_value', type: glue.Schema.DOUBLE },
        { name: 'timestamp', type: glue.Schema.TIMESTAMP },
      ],
      dataFormat: new glue.DataFormat({
        inputFormat: new glue.InputFormat(
          'org.apache.hadoop.mapred.TextInputFormat'
        ),
        outputFormat: new glue.OutputFormat(
          'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
        ),
        serializationLibrary: glue.SerializationLibrary.LAZY_SIMPLE,
      }),
      partitionKeys: [{ name: 'log_id', type: glue.Schema.STRING }],
      s3Prefix: '${log_id}',
    });

How can I add the projection information here (as mentioned in TBLPROPERTIES in the SQL query)? I do not see any relevant properties in any of the mentioned classes/interfaces to do that. Does CDK have any support for projections? (dynamic ID partitioning)

I have referred the following links from the docs:

Please let me know if I am missing something.

chuckbenger commented 3 years ago

Yeah I'm looking for this feature as well. I did the same dynamic partitioning through the console using the TBLPROPERTIES and can't seem to using CDK. It seems like I'll have to use the Glue Crawler as a work around for now.

chuckbenger commented 3 years ago

Found another workaround. You can do this using the CFN Object instead of the CDK one as shown below.

https://aws.amazon.com/about-aws/whats-new/2020/06/amazon-athena-supports-partition-projection/


new glue.CfnTable(this, "TestTable", {
            databaseName: database.databaseName,
            catalogId: "ACCOUNT_ID",
            tableInput: {
                name: "test",
                tableType: "EXTERNAL_TABLE",
                parameters: {
                    "projection.enabled": "true",
                    "projection.version.type": "enum",
                    "projection.version.values": "v2",
                    "projection.dataset-date.type": "date",
                    "projection.dataset-date.range": "2020-12-18,NOW",
                    "projection.dataset-date.format": "yyyy-MM-dd",
                    "projection.dataset-date.interval": "1",
                    "projection.dataset-date.interval.unit": "DAYS",
                    "projection.something.type": "enum",
                    "projection.something.values": "1,2,3,4,5,6,7",
                    "projection.recommend.type": "enum",
                    "projection.recommend.values": "true,false",
                    "storage.location.template": "s3://bucket/version=${version}/dataset-date=${dataset-date}/something=${something}/recommend=${recommend}/"
                },
                storageDescriptor: {
                    columns: [
                        {
                            "name": "something",
                            "type": "string"
                        },
                        {
                            "name": "something",
                            "type": "string"
                        },
                        {
                            "name": "something",
                            "type": "string"
                        },
                        {
                            "name": "something",
                            "type": "string"
                        },
                        {
                            "name": "something",
                            "type": "array<string>"
                        },
                        {
                            "name": "something",
                            "type": "array<string>"
                        },
                    ],
                    serdeInfo: {
                        serializationLibrary: "org.apache.hive.hcatalog.data.JsonSerDe"
                    },
                    location: "s3://your_bucket",
                    inputFormat: "org.apache.hadoop.mapred.TextInputFormat",
                    outputFormat: "org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat"
                },
                partitionKeys: [
                    {
                        "name": "version",
                        "type": "string"
                    },
                    {
                        "name": "dataset-date",
                        "type": "date"
                    },
                    {
                        "name": "something",
                        "type": "string"
                    },
                    {
                        "name": "recommend",
                        "type": "string"
                    }
                ]
            }
        })```
hgrgic commented 3 years ago

I have recently noticed that this post is still open while looking for approaches to a similar problem.

Both @shwetajoshi601 and @tkblackbelt suggest good approaches. What I found to work best for me was to rely on the concept of Escape Hatches in CDK where you can easily extend CDK code with custom overrides that are not yet supported in L2 constructs.

In this case you could:

  1. Follow @shwetajoshi601 approach in the CDK definition to extract the maximum from the CDK L2 construct.
  2. Extend that object by adding custom property overrides via CDK EscapeHatch code like:
    CfnTable cfnTable = (CfnTable) athenaTable.getNode().getDefaultChild();
    cfnTable.addPropertyOverride("TableInput.Parameters.projection\\.enabled", true);
    cfnTable.addPropertyOverride("TableInput.Parameters.projection\\.year\\.type", "injected");
    cfnTable.addPropertyOverride("TableInput.Parameters.projection\\.month\\.type", "injected");
    cfnTable.addPropertyOverride("TableInput.Parameters.projection\\.day\\.type", "injected");
    cfnTable.addPropertyOverride("TableInput.Parameters.storage\\.location\\.template", String.format("s3://%s/${year}/${month}/${day}", inputBucket.getBucketName()));

    My example is in Java, but Escape hatches are supported for all implementations.

WtfJoke commented 1 year ago

We've used @aws-cdk.aws-glue-alpha sucessfully to create a table (in athena). Might be interesting to people which follow this issue. We needed to use escape hatches for a few properties.

const table = new glueAlpha.Table(this, "GlueEvents", {
            database,
            bucket,
            columns: [
                {
                    name: "eventId",
                    type: glueAlpha.Schema.STRING,
                },
                {
                    name: "createdAt",
                    type: glueAlpha.Schema.STRING,
                    comment: "ISO-8601 Timestamp",
                },
            ],
            dataFormat: glueAlpha.DataFormat.CSV,
            description: `Events collected via S3 bucket ${bucket.bucketName}.`,
});

// Set Table properties to skip header line (since its not possible with the current L2 construct)
const cfnTable = table.node.defaultChild as CfnTable;
cfnTable.addPropertyOverride("TableInput.Parameters", {
    "skip.header.line.count": "1",
});