awslabs / aws-glue-libs

AWS Glue Libraries are additions and enhancements to Spark for ETL operations.
Other
648 stars 305 forks source link

Unnest ddb json also unnest nested object #172

Open gjgarryuan opened 1 year ago

gjgarryuan commented 1 year ago

Hey I am working on a glue job to export ddb data into s3 using the new ddb export connector.

Glue version: 4.0 Language: Python 3

Script:

import sys

from awsglue.context import GlueContext
from awsglue.job import Job
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.sql import SparkSession

args = getResolvedOptions(
    sys.argv, ["JOB_NAME", "ddb_table_arn", "glue_database_name", "glue_table_name", "s3_bucket_name", "s3_prefix"]
)
job_name = args["JOB_NAME"]
ddb_table_arn = args["ddb_table_arn"]
glue_database_name = args["glue_database_name"]
glue_table_name = args["glue_table_name"]
s3_bucket_name = args["s3_bucket_name"]
s3_prefix = args["s3_prefix"]
s3_ddbexport_prefix = "temporary/ddbexport/"

glueContext = GlueContext(SparkSession.builder.getOrCreate())
job = Job(glueContext)
job.init(job_name, args)

dyf_ddb = glueContext.create_dynamic_frame.from_options(
    connection_type="dynamodb",
    connection_options={
        "dynamodb.export": "ddb",
        "dynamodb.s3.bucket": s3_bucket_name,
        "dynamodb.s3.prefix": s3_ddbexport_prefix,
        "dynamodb.tableArn": ddb_table_arn,
        # "dynamodb.unnestDDBJson": True,
    },
    transformation_ctx="ddb_export",
)

dyf_ddb.printSchema()

dyf_ddb_unnested = dyf_ddb.unnest_ddb_json(transformation_ctx="unnest_ddb_frame")
dyf_ddb_unnested.printSchema()

s3_sink = glueContext.getSink(
    connection_type="s3",
    path=f"s3://{s3_bucket_name}/{s3_prefix.strip('/')}/",
    enableUpdateCatalog=True,
    updateBehavior="UPDATE_IN_DATABASE",
    transformation_ctx="s3_sink",
)
s3_sink.setFormat(format="parquet", useGlueParquetWriter=True)
s3_sink.setCatalogInfo(catalogDatabase=glue_database_name, catalogTableName=glue_table_name)
s3_sink.writeFrame(dyf_ddb_unnested)

job.commit()

The schema before unnest_ddb_json is:

root
|-- Item: struct
|    |-- id: struct
|    |    |-- S: string
...
|    |-- data: struct
|    |    |-- M: struct
|    |    |    |-- arn: struct
|    |    |    |    |-- S: string
...
|    |-- account: struct
|    |    |-- M: struct
|    |    |    |-- accountName: struct
|    |    |    |    |-- S: string
|    |    |    |-- accountId: struct
|    |    |    |    |-- S: string
|    |    |    |-- activated: struct
|    |    |    |    |-- NULL: boolean
...
|    |-- createdDate: struct
|    |    |-- S: string
|    |-- updatedDate: struct
|    |    |-- S: string

and after the unnest:

root
|-- id: string
...
|-- data: struct
|    |-- arn: struct
|    |    |-- S: string
|-- arn: string
...
|-- account: struct
|    |-- accountName: struct
|    |    |-- S: string
|    |-- accountId: struct
|    |    |-- S: string
|    |-- activated: struct
|    |    |-- NULL: boolean
|-- accountName: string
|-- accountId: string
...
|-- createdDate: string
|-- updatedDate: string

As you can see above,

Is this behaviour expected?

gjgarryuan commented 1 year ago

In addition, another finding is that ddb-json numeric string got converted into string directly.

Before:

root
|-- Item: struct
|    |-- version: struct
|    |    |-- N: string

After:

root
|-- version: string
jarekkoziol commented 1 year ago

Similar issue for me, for instance, original structure:

root
|-- Item: struct
|    |-- Id: struct
|    |    |-- S: string
|    |-- EntityName: struct
|    |    |-- S: string
|    |-- Message: struct
|    |    |-- M: struct
|    |    |    |-- MessageType: struct
|    |    |    |    |-- S: string
|    |    |    |-- MessageData: struct
|    |    |    |    |-- M: struct
|    |    |    |    |    |-- UserName: struct
|    |    |    |    |    |    |-- S: string
|    |    |    |    |    |-- At: struct
|    |    |    |    |    |    |-- S: string

gets converted into

root
|-- Id: string
|-- EntityName: string
|-- Message: struct
|    |-- MessageType: struct
|    |    |-- S: string
|    |-- MessageData: struct
|    |    |-- M: struct
|    |    |    |-- UserName: struct
|    |    |    |    |-- S: string
|    |    |    |-- At: struct
|    |    |    |    |-- S: string`

It is dynamodb dedicated, but it doesn't remove dynamodb types indicators (e.g. "S":, "M") in whole nested data but only in first level. It would be great if it removed types indicator and keep the structure (or conditionally flat data completly)

I'm migrating data from one dynamodb table to another, and I need to add some additional data during migration. It seems like I can't use unnest_ddb_json to prepare data, and I have to write my own function.

chrisruddick commented 1 year ago

I'm facing the same problem. Did anyone find a workaround for this issue?

Kasra-G commented 1 year ago

best way around this is the following sample code:

from boto3.dynamodb.types import TypeDeserializer

deserializer = TypeDeserializer()
def mapping_function(record):
    record = {k: deserializer.deserialize(value=v) for k,v in record['Item'].items()}

dyf_ddb_unnested = dyf_ddb.map(mapping_function, transformation_ctx="unnest_ddb_frame")

Note that you may have to explicitly convert number type columns to int or float. The boto3 TypeDeserializer will turn numbers into Decimal objects to preserve the accuracy of the number type in dynamodb

ysfaran commented 9 months ago

The solution of @Kasra-G was a great hint but didn't work out of the box for me.

I needed to call .items() on record['Item'] first before iterating and also just returned the new record:

- record = {k: deserializer.deserialize(value=v) for k,v in record['Item']}
+ return {k: deserializer.deserialize(value=v) for (k,v) in record['Item'].items()}
pwrstar commented 6 months ago

After lot of googling for some help and going through lot of pain of trying to convert dynamo json myself (for some reason, the solution above was giving all NULLs in my case), I found this method - simplify_ddb_json(): DynamicFrame , thought might help others with conversion. here is the link: https://docs.aws.amazon.com/glue/latest/dg/aws-glue-api-crawler-pyspark-extensions-dynamic-frame.html#aws-glue-api-crawler-pyspark-extensions-dynamic-frame-simplify

kennedymota commented 6 months ago

After lot of googling for some help and going through lot of pain of trying to convert dynamo json myself (for some reason, the solution above was giving all NULLs in my case), I found this method - simplify_ddb_json(): DynamicFrame , thought might help others with conversion. here is the link: https://docs.aws.amazon.com/glue/latest/dg/aws-glue-api-crawler-pyspark-extensions-dynamic-frame.html#aws-glue-api-crawler-pyspark-extensions-dynamic-frame-simplify

This can also help @pwrstar, but just pay attention to this point: "If there are several types or types of Map in a type of List, the List elements will not be simplified".