awslabs / aws-glue-libs

AWS Glue Libraries are additions and enhancements to Spark for ETL operations.
Other
635 stars 300 forks source link

Got `pyspark.sql.utils.ParseException` error when read from DynamoDB table that has field with `Map` type #147

Open knightazura opened 2 years ago

knightazura commented 2 years ago

Hello

I've create a Glue job that simply read data from DynamoDB that use AWS Glue DynamoDB export connector as source, do some transformation and then write result into S3. But it failed, got ParseException error like this.

22/08/23 15:03:21 ERROR GlueExceptionAnalysisListener: [Glue Exception Analysis] {
    "Event": "GlueETLJobExceptionEvent",
    "Timestamp": 1661267001368,
    "Failure Reason": "Traceback (most recent call last):\n  File \"/tmp/test-dataset-creator.py\", line 40, in <module>\n    transformation_ctx=\"itemsDF\")\n  File \"/opt/amazon/lib/python3.6/site-packages/awsglue/context.py\", line 234, in create_dynamic_frame_from_options\n    return source.getFrame(**kwargs)\n  File \"/opt/amazon/lib/python3.6/site-packages/awsglue/data_source.py\", line 36, in getFrame\n    jframe = self._jsource.getDynamicFrame()\n  File \"/opt/amazon/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py\", line 1305, in __call__\n    answer, self.gateway_client, self.target_id, self.name)\n  File \"/opt/amazon/spark/python/lib/pyspark.zip/pyspark/sql/utils.py\", line 117, in deco\n    raise converted from None\npyspark.sql.utils.ParseException: \nmismatched input '.105' expecting {<EOF>, '(', '.', '[', 'ADD', 'AFTER', 'ALL', 'ALTER', 'ANALYZE', 'AND', 'ANTI', 'ANY', 'ARCHIVE', 'ARRAY', 'AS', 'ASC', 'AT', 'AUTHORIZATION', 'BETWEEN', 'BOTH', 'BUCKET', 'BUCKETS', 'BY', 'CACHE', 'CASCADE', 'CASE', 'CAST', 'CHANGE', 'CHECK', 'CLEAR', 'CLUSTER', 'CLUSTERED', 'CODEGEN', 'COLLATE', 'COLLECTION', 'COLUMN', 'COLUMNS', 'COMMENT', 'COMMIT', 'COMPACT', 'COMPACTIONS', 'COMPUTE', 'CONCATENATE', 'CONSTRAINT', 'COST', 'CREATE', 'CROSS', 'CUBE', 'CURRENT', 'CURRENT_DATE', 'CURRENT_TIME', 'CURRENT_TIMESTAMP', 'CURRENT_USER', 'DATA', 'DATABASE', DATABASES, 'DBPROPERTIES', 'DEFINED', 'DELETE', 'DELIMITED', 'DESC', 'DESCRIBE', 'DFS', 'DIRECTORIES', 'DIRECTORY', 'DISTINCT', 'DISTRIBUTE', 'DIV', 'DROP', 'ELSE', 'END', 'ESCAPE', 'ESCAPED', 'EXCEPT', 'EXCHANGE', 'EXISTS', 'EXPLAIN', 'EXPORT', 'EXTENDED', 'EXTERNAL', 'EXTRACT', 'FALSE', 'FETCH', 'FIELDS', 'FILTER', 'FILEFORMAT', 'FIRST', 'FOLLOWING', 'FOR', 'FOREIGN', 'FORMAT', 'FORMATTED', 'FROM', 'FULL', 'FUNCTION', 'FUNCTIONS', 'GLOBAL', 'GRANT', 'GROUP', 'GROUPING', 'HAVING', 'IF', 'IGNORE', 'IMPORT', 'IN', 'INDEX', 'INDEXES', 'INNER', 'INPATH', 'INPUTFORMAT', 'INSERT', 'INTERSECT', 'INTERVAL', 'INTO', 'IS', 'ITEMS', 'JOIN', 'KEYS', 'LAST', 'LATERAL', 'LAZY', 'LEADING', 'LEFT', 'LIKE', 'LIMIT', 'LINES', 'LIST', 'LOAD', 'LOCAL', 'LOCATION', 'LOCK', 'LOCKS', 'LOGICAL', 'MACRO', 'MAP', 'MATCHED', 'MERGE', 'MSCK', 'NAMESPACE', 'NAMESPACES', 'NATURAL', 'NO', NOT, 'NULL', 'NULLS', 'OF', 'ON', 'ONLY', 'OPTION', 'OPTIONS', 'OR', 'ORDER', 'OUT', 'OUTER', 'OUTPUTFORMAT', 'OVER', 'OVERLAPS', 'OVERLAY', 'OVERWRITE', 'PARTITION', 'PARTITIONED', 'PARTITIONS', 'PERCENT', 'PIVOT', 'PLACING', 'POSITION', 'PRECEDING', 'PRIMARY', 'PRINCIPALS', 'PROPERTIES', 'PURGE', 'QUERY', 'RANGE', 'RECORDREADER', 'RECORDWRITER', 'RECOVER', 'REDUCE', 'REFERENCES', 'REFRESH', 'RENAME', 'REPAIR', 'REPLACE', 'RESET', 'RESTRICT', 'REVOKE', 'RIGHT', RLIKE, 'ROLE', 'ROLES', 'ROLLBACK', 'ROLLUP', 'ROW', 'ROWS', 'SCHEMA', 'SELECT', 'SEMI', 'SEPARATED', 'SERDE', 'SERDEPROPERTIES', 'SESSION_USER', 'SET', 'MINUS', 'SETS', 'SHOW', 'SKEWED', 'SOME', 'SORT', 'SORTED', 'START', 'STATISTICS', 'STORED', 'STRATIFY', 'STRUCT', 'SUBSTR', 'SUBSTRING', 'TABLE', 'TABLES', 'TABLESAMPLE', 'TBLPROPERTIES', TEMPORARY, 'TERMINATED', 'THEN', 'TIME', 'TO', 'TOUCH', 'TRAILING', 'TRANSACTION', 'TRANSACTIONS', 'TRANSFORM', 'TRIM', 'TRUE', 'TRUNCATE', 'TYPE', 'UNARCHIVE', 'UNBOUNDED', 'UNCACHE', 'UNION', 'UNIQUE', 'UNKNOWN', 'UNLOCK', 'UNSET', 'UPDATE', 'USE', 'USER', 'USING', 'VALUES', 'VIEW', 'VIEWS', 'WHEN', 'WHERE', 'WINDOW', 'WITH', 'ZONE', EQ, '<=>', '<>', '!=', '<', LTE, '>', GTE, '+', '-', '*', '/', '%', '&', '|', '||', '^', IDENTIFIER, BACKQUOTED_IDENTIFIER}(line 1, pos 17)\n\n== SQL ==\nItem.tagsInBody.M.105.S as 105\n-----------------^^^\n",
    "Stack Trace": [
        {
            "Declaring Class": "deco",
            "Method Name": "raise converted from None",
            "File Name": "/opt/amazon/spark/python/lib/pyspark.zip/pyspark/sql/utils.py",
            "Line Number": 117
        },
        {
            "Declaring Class": "__call__",
            "Method Name": "answer, self.gateway_client, self.target_id, self.name)",
            "File Name": "/opt/amazon/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py",
            "Line Number": 1305
        },
        {
            "Declaring Class": "getFrame",
            "Method Name": "jframe = self._jsource.getDynamicFrame()",
            "File Name": "/opt/amazon/lib/python3.6/site-packages/awsglue/data_source.py",
            "Line Number": 36
        },
        {
            "Declaring Class": "create_dynamic_frame_from_options",
            "Method Name": "return source.getFrame(**kwargs)",
            "File Name": "/opt/amazon/lib/python3.6/site-packages/awsglue/context.py",
            "Line Number": 234
        },
        {
            "Declaring Class": "<module>",
            "Method Name": "transformation_ctx=\"itemsDF\")",
            "File Name": "/tmp/test-dataset-creator.py",
            "Line Number": 40
        }
    ],
    "Last Executed Line number": 40,
    "script": "test-dataset-creator.py"
}

After tinkering with the error, I found that if there is a record that has field with map type specifically the key is number, it will cause the error.

Here's example record in form of DynamoDB JSON

{
  "user_id": {
    "N": "674321"
  },
  "post_id": {
    "N": "23456"
  },
  "map_example": {
    "M": {
      "150": {
        "S": "One hundred fifty"
      }
    }
  }
}

I have two questions:

  1. Is there any ways to skip read unnecessary fields when creating DynamicFrame? Incidentally the problematic field is not nececssary in my case, so I thought maybe it can be skipped. Have tried using schema in formatOptions but it didn't work.
  2. If question 1 is not possible, how to resolve this error?

Thanks


Snippet to create DynamicFrame

# import

# thought using schema will skip read unnecessary fields, turns out it wrong
schema = StructType([
  Field("postId", LongType()),
  Field("tags", ArrayType()),
  Field("subtags", ArrayType()),
  # Field("tagsInBody", MapType()), -- enable this line didn't work as well
])

itemsDF = glueContext.create_dynamic_frame_from_options(
    connection_type="dynamodb",
    connection_options={
        "dynamodb.export": "ddb",
        "dynamodb.tableArn": args['SOURCE_TABLE_ARN'],
        "dynamodb.unnestDDBJson": True,
        "dynamodb.s3.bucket": args['TEMP_BUCKET'],
        "dynamodb.s3.prefix": "ddbexport/",
    },
    format="json",
    format_options={"withSchema": schema.jsonValue()},
    transformation_ctx="itemsDF")

ETL Job details

Glue Job Type: Spark ETL Language: Python Glue Version: 3.0

moomindani commented 2 years ago

Do you still see the same issue without format_options and schema? If so, can you paste the original full stacktrace instead of Glue Exception Analysis?

Meanwhile, immediate workaround will be to use the traditional DynamoDB connector instead of the new DynamoDB export connector.

knightazura commented 2 years ago

Hello, thanks for your reply.

Do you still see the same issue without format_options and schema? If so, can you paste the original full stacktrace instead of Glue Exception Analysis?

Yes, still got the issue without those parameters. Unfortunately the log has been removed because it passed retention time.

Meanwhile, immediate workaround will be to use the traditional DynamoDB connector instead of the new DynamoDB export connector.

Actually the reason why we choose use new DynamoDB export connector is to avoid traditional connector due to its more expensive cost.

moomindani commented 2 years ago

I have tested but I was not able to reproduce the issue. Can you compare this with your job run? At least we need to reproduce the issue to identify the cause.

Test data in DynamoDB table:

{
  "user_id": {
    "S": "674321"
  },
  "map_example": {
    "M": {
      "150": {
        "S": "One hundred fifty"
      }
    }
  },
  "post_id": {
    "N": "23456"
  }
}

Test Glue script:

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME'])

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)

dyf = glueContext.create_dynamic_frame_from_options(
    connection_type="dynamodb",
    connection_options={
        "dynamodb.export": "ddb",
        "dynamodb.tableArn": "arn:aws:dynamodb:us-east-1:123456789101:table/test_table",
        "dynamodb.unnestDDBJson": True,
        "dynamodb.s3.bucket": "bucket_name",
        "dynamodb.s3.prefix": "ddbexport/",
    },
    format="json",
    transformation_ctx="itemsDF"
)

dyf.toDF().show()

Output:

+-------+--------------------+-----------------+-------+
|user_id|         map_example|              150|post_id|
+-------+--------------------+-----------------+-------+
| 674321|{{One hundred fif...|One hundred fifty|  23456|
+-------+--------------------+-----------------+-------+
knightazura commented 2 years ago

I have tested again using your script and still got the issue.

Test data

{
  "user_id": {
    "N": "222222"
  },
  "post_id": {
    "N": "222222"
  },
  "map_example": {
    "M": {
      "150": {
        "S": "One hundred fifty"
      }
    }
  }
}

May I know, what version of Spark are you using?

Anyway, here's the full log.

moomindani commented 2 years ago

I tested it on Glue 3.0 / Spark 3.1.1. Where are you testing? Glue job? Local lib? Docker container? Which version?

knightazura commented 2 years ago

Tested from Glue Job. Version is same as yours, Glue 3.0 / Spark 3.1