awslabs / aws-glue-libs

AWS Glue Libraries are additions and enhancements to Spark for ETL operations.
Other
641 stars 303 forks source link

DynamicFrame.map() touches timestamps! #103

Open macamhi opened 3 years ago

macamhi commented 3 years ago

Another issue with DynamicFrame.map() : see code below, the datetime represented by a timestamp field is touched when a map() transform is applied, even with a no-op transform! More specifically, the milliseconds part is transformed if it begins by a 0, eg. .044 --> .44 I suspect a buggy conversion from number to str to number on milliseconds. See attached notebook export BugTimestamp.pdf and code below (Glue version 2.0)

# %%
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from awsglue.context import GlueContext
from awsglue.job import Job
from awsglue.dynamicframe import DynamicFrame
from pyspark.context import SparkContext
from pyspark.sql.functions import *
from pyspark.sql.dataframe import DataFrame
from pyspark.sql.types import *

# %%
try:
    glueContext = GlueContext(SparkContext.getOrCreate())
    spark = glueContext.spark_session
    spark.conf.set("spark.sql.broadcastTimeout", 900)
    job = Job(glueContext)
    job.init(args['JOB_NAME'], args)
except Exception as e:
    glueContext = GlueContext(SparkContext.getOrCreate())

# %%
# Sample data with just a timestamp in milliseconds
data = [
    {
        "timestamp":1615846851044
    }
]

# %%
# We create a Spark DataFrame from this data
df = spark.createDataFrame(data)
df.printSchema()
df.show(truncate=False)

# %%
df2 = df.withColumn('timestamp',(df.timestamp / 1000).cast('timestamp'))
df2.printSchema()
df2.show(truncate=False)

# %%
#We then create a Glue DynamicFrame from this DataFrame
gdf = DynamicFrame.fromDF(df2, glueContext, "gdf")
gdf.printSchema()
gdf.show() # shows nothing, we have to go back to DataFrame to see something

# %%
gdf.toDF().show(truncate=False)

# %%
# The issue appears when we apply a mapping function, even a very simple one!
gdf2 = gdf.map(lambda x: x) # Should return the record untouched, but...
gdf2.printSchema()
gdf2.toDF().show(truncate=False)