awslabs / aws-glue-libs

AWS Glue Libraries are additions and enhancements to Spark for ETL operations.
Other
635 stars 300 forks source link

DynamicFrame.map() fails when schema contains a Map(String,Array(String)) #102

Open macamhi opened 2 years ago

macamhi commented 2 years ago

When developing an evolution to an existing PySpark job (Glue version 2.0) with a Glue DevEndpoint, I ran into an error I cannot explain : a java.lang.ClassCastException is cast in an Executor log when I apply a transformation (transform_timestamps) on the final DynamicFrame assignedFormDF through a map() call. This transformation was working previously before I added a new data structure to the DynamicFrame, addInfo, which has the following structure:

 |-- addInfos: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- containerId: integer (nullable = true)
 |    |    |-- containerLabel: string (nullable = true)
 |    |    |-- addInfoMap: map (nullable = true)
 |    |    |    |-- key: string
 |    |    |    |-- value: array (valueContainsNull = true)
 |    |    |    |    |-- element: string (containsNull = true)

The transform is not supposed to interact with this data structure at this time!

So I tried to reproduce this, and obtained the same exception (java.lang.ClassCastException: java.lang.String cannot be cast to java.util.ArrayList) with a one record sample data and limited schema, with the map transform being a simple no-op lambda, see code below (cannot attach notebook). Could you please investigate? I'm blocked in my devs because of this issue.

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from awsglue.context import GlueContext
from awsglue.job import Job
from awsglue.dynamicframe import DynamicFrame
from pyspark.context import SparkContext
from pyspark.sql.functions import *
from pyspark.sql.dataframe import DataFrame
from pyspark.sql.types import *

try:
    glueContext = GlueContext(SparkContext.getOrCreate())
    spark = glueContext.spark_session
    spark.conf.set("spark.sql.broadcastTimeout", 900)
    job = Job(glueContext)
    job.init(args['JOB_NAME'], args)
except Exception as e:
    glueContext = GlueContext(SparkContext.getOrCreate())

# Sample data with the problematic (but needed) schema
data = [
    {
        "assignedFormId":1,
        "addInfos":[
            {
                "containerId":1,
                "containerLabel": "testLabel",
                "addInfoMap": {
                    "General Info": ["test1","test2"],
                    "otherEquipment": ["Boiler 1", "Boiler 2", "Boiler 3"]
                }
            }
        ]
    }
]
schema = StructType([
    StructField("assignedFormId",IntegerType(), True),
    StructField("addInfos",ArrayType(StructType([
        StructField("containerId", IntegerType(), True),
        StructField("containerLabel", StringType(), True),
        StructField("addInfoMap", MapType(StringType(),ArrayType(StringType(),True)), True)
    ])))])

# We create a Spark DataFrame from this data and schema
df = spark.createDataFrame(data, schema)
df.printSchema()
df.show(truncate=False)

#We then create a Glue DynamicFrame from this DataFrame
gdf = DynamicFrame.fromDF(df, glueContext, "gdf")
gdf.printSchema()
print(gdf.count())
gdf.show(1)

# The issue appears when we apply a mapping function, even a very simple one!
gdf2 = gdf.map(lambda x: x) # Should return the record untouched, but...
gdf2.printSchema()
gdf2.count() # and then, this fails with a java.lang.ClassCastException: java.lang.String cannot be cast to java.util.ArrayList

job.commit()
bokoio commented 7 months ago

I have the same issue, where map apply is not considering the new mapping.