awslabs / aws-glue-libs

AWS Glue Libraries are additions and enhancements to Spark for ETL operations.
Other
635 stars 299 forks source link

Parser error but incomplete file location #194

Open juliocpmelo opened 1 year ago

juliocpmelo commented 1 year ago

Hello I was facing several parser errors while processing many csv files with glue, the enviroment is configured to run with Glue 4.0, and all the default configurations (also tried 3.0). The etl script I'm using is as follows:

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from awsglue.dynamicframe import DynamicFrameCollection
from awsglue.dynamicframe import DynamicFrame
import re

# Script generated for node Custom Transform
def MyTransform(glueContext, dfc) -> DynamicFrameCollection:
    import pyspark.sql.functions as f
    from pyspark.sql.functions import to_timestamp, date_format

    df = dfc.select(list(dfc.keys())[0]).toDF()
    df_filtered_1 = df.withColumn("year", date_format(f.col("ts"), "yyyy"))
    df_filtered_2 = df_filtered_1.withColumn("month", date_format(f.col("ts"), "MM"))
    df_filtered_3 = df_filtered_2.withColumn("day", date_format(f.col("ts"), "d"))

    dyf_filtered = DynamicFrame.fromDF(df_filtered_3, glueContext, "add_index_columns")

    return DynamicFrameCollection(
        {"CustomTransformAddColumns": dyf_filtered}, glueContext
    )

args = getResolvedOptions(sys.argv, ["JOB_NAME"])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args["JOB_NAME"], args)

# Script generated for node S3 bucket
S3bucket_node1 = glueContext.create_dynamic_frame.from_options(
    format_options={
        "quoteChar": '"',
        "withHeader": True,
        "separator": ",",
        "optimizePerformance": False,
    },
    connection_type="s3",
    format="csv",
    connection_options={
        "paths": ['s3://path/with/many/csvs'],
        "recurse": True
    },
    transformation_ctx="S3bucket_node1",
)

def testRows(row):
    return bool(re.match("^[A-Za-z0-9]+$", row["serial_number"])) and bool(re.match("^2023-.*", row["ts"])) and not bool(re.match("connections", row["parameter"]))

# Script generated for node Filter
Filter_node1692709557667 = Filter.apply(
    frame=S3bucket_node1,
    f=testRows,
    transformation_ctx="Filter_node1692709557667",
)

# Script generated for node Change Schema
ChangeSchema_node1692709669764 = ApplyMapping.apply(
    frame=Filter_node1692709557667,
    mappings=[
        ("id", "string", "id", "string"),
        ("created_at", "string", "created_at", "timestamp"),
        ("parameter", "string", "parameter", "string"),
        ("serial_number", "string", "serial_number", "string"),
        ("ts", "string", "ts", "timestamp"),
        ("value", "string", "value", "string"),
        ("device_model_id", "string", "device_model_id", "string"),
    ],
    transformation_ctx="ChangeSchema_node1692709669764",
)

# Script generated for node Custom Transform
CustomTransform_node1692709691084 = MyTransform(
    glueContext,
    DynamicFrameCollection(
        {"ChangeSchema_node1692709669764": ChangeSchema_node1692709669764}, glueContext
    ),
)

# Script generated for node Select From Collection
SelectFromCollection_node1692709727564 = SelectFromCollection.apply(
    dfc=CustomTransform_node1692709691084,
    key=list(CustomTransform_node1692709691084.keys())[0],
    transformation_ctx="SelectFromCollection_node1692709727564",
)

# Script generated for node Amazon S3
AmazonS3_node1692709906215 = glueContext.getSink(
    path="s3://dest/",
    connection_type="s3",
    updateBehavior="UPDATE_IN_DATABASE",
    partitionKeys=["serial_number", "year", "month", "day"],
    compression="snappy",
    enableUpdateCatalog=True,
    transformation_ctx="AmazonS3_node1692709906215",
)
AmazonS3_node1692709906215.setCatalogInfo(
    catalogDatabase="dest-db", catalogTableName="dest"
)
AmazonS3_node1692709906215.setFormat("glueparquet")
AmazonS3_node1692709906215.writeFrame(SelectFromCollection_node1692709727564)
job.commit()

The structure of my s3 repository is: deviceId (eg: 21134412AB) |--timestamp (eg: 2023-05-10) |--|--data.csv

There are many folders, each representing a deviceId, with many timestamps within. When firing the ETL job the process is ending with the following error:

: org.apache.spark.SparkException: Job aborted due to stage failure: Task 29745 in stage 0.0 failed 4 times, most recent failure: Lost task 29745.3 in stage 0.0 (TID 30301) (172.34.138.241 executor 8): com.amazonaws.services.glue.util.FatalException: Unable to parse file: data.csv

As you can see there is no info of which file is giving the error. The expected output could be at least the full path of "data.csv", something like "s3://bucket/21134412AB/2023-05-10/data.csv" so I would be able to fix the file.