aws-samples / data-engineering-for-aws-immersion-day

Lab Instructions for Data Engineering Immersion Day
MIT No Attribution
179 stars 95 forks source link

Glue Job - Convert from JSON to Parquet fails #26

Closed anwt-aws closed 2 years ago

anwt-aws commented 2 years ago

I have followed the instructions here: https://catalog.us-east-1.prod.workshops.aws/workshops/976050cc-0606-4b23-b49f-ca7b8ac4b153/en-US/300/330-streaming-lab

The Stream ETL with Glue lab, and completed up to [Create and trigger the Glue Streaming Job]. However, when I run the Glue ETL job I get:

StreamingQueryException: An exception was raised by the Python Proxy. Return Message: Traceback (most recent call last):

No other error details are given. I've recreated the streams, jobs etc etc many times and still run into this issue. I've tried to change the Data Catalog which makes no difference.

If I set the S3 Data target properties FORMAT to JSON rather than Parquet it runs successfully and the files are generated in the S3 bucket. As soon as it is set to JSON you get this exception error and no files are created.

When looking into the Error logs in CloudWatch I get this:

2022-09-29 09:34:35,456 ERROR [stream execution thread for [id = b09b51e0-dd19-49a1-ae93-82b650e49bc9, runId = 2c922e4d-a24c-41a3-925a-a5d4842c9c0f]] streaming.MicroBatchExecution (Logging.scala:logError(94)): Query [id = b09b51e0-dd19-49a1-ae93-82b650e49bc9, runId = 2c922e4d-a24c-41a3-925a-a5d4842c9c0f] terminated with errorpy4j.Py4JException: An exception was raised by the Python Proxy. Return Message: Traceback (most recent call last): File "/opt/amazon/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 2442, in _call_proxy return_value = getattr(self.pool[obj_id], method)(params) File "/opt/amazon/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 196, in call raise e File "/opt/amazon/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 193, in call self.func(DataFrame(jdf, self.sql_ctx), batch_id) File "/opt/amazon/lib/python3.6/site-packages/awsglue/context.py", line 653, in batch_function_with_persist batch_function(data_frame, batchId) File "/tmp/ETLJob.py", line 50, in processBatch S3bucket_node3.writeFrame(KinesisStream_node1) File "/opt/amazon/lib/python3.6/site-packages/awsglue/data_sink.py", line 32, in writeFrame return DynamicFrame(self._jsink.pyWriteDynamicFrame(dynamic_frame._jdf, callsite(), info), dynamic_frame.glue_ctx, dynamic_frame.name + "_errors") File "/opt/amazon/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1305, in call answer, self.gateway_client, self.target_id, self.name) File "/opt/amazon/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 111, in deco return f(a, **kw) File "/opt/amazon/spark/python/lib/py4j-0.10.9-src.zip/py4j/protocol.py", line 328, in get_return_value format(target_id, ".", name), value)py4j.protocol.Py4JJavaError: An error occurred while calling o148.pyWriteDynamicFrame.: java.lang.NullPointerException at java.lang.Class.isAssignableFrom(Native Method) at org.apache.hadoop.conf.Configuration.setClass(Configuration.java:2710) at org.apache.hadoop.mapreduce.lib.output.FileOutputFormat.setOutputCompressorClass(FileOutputFormat.java:116) at com.amazonaws.services.glue.sinks.HadoopDataSink.$anonfun$writeDynamicFrame$2(HadoopDataSink.scala:201) at com.amazonaws.services.glue.util.FileSchemeWrapper.$anonfun$executeWithQualifiedScheme$1(FileSchemeWrapper.scala:77) at com.amazonaws.services.glue.util.FileSchemeWrapper.executeWith(FileSchemeWrapper.scala:70) at com.amazonaws.services.glue.util.FileSchemeWrapper.executeWithQualifiedScheme(FileSchemeWrapper.scala:77) at com.amazonaws.services.glue.sinks.HadoopDataSink.$anonfun$writeDynamicFrame$1(HadoopDataSink.scala:157) at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:253) at com.amazonaws.services.glue.sinks.HadoopDataSink.writeDynamicFrame(HadoopDataSink.scala:151) at com.amazonaws.services.glue.DataSink.pyWriteDynamicFrame(DataSink.scala:72) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) at py4j.Gateway.invoke(Gateway.java:282) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:238) at java.lang.Thread.run(Thread.java:750) at py4j.Protocol.getReturnValue(Protocol.java:476) at py4j.reflection.PythonProxyHandler.invoke(PythonProxyHandler.java:108) at com.sun.proxy.$Proxy54.call(Unknown Source) at org.apache.spark.sql.execution.streaming.sources.PythonForeachBatchHelper$.$anonfun$callForeachBatch$1(ForeachBatchSink.scala:55) at org.apache.spark.sql.execution.streaming.sources.PythonForeachBatchHelper$.$anonfun$callForeachBatch$1$adapted(ForeachBatchSink.scala:55) at org.apache.spark.sql.execution.streaming.sources.ForeachBatchSink.addBatch(ForeachBatchSink.scala:35) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$16(MicroBatchExecution.scala:586) at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107) at org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:232) at org.apache.spark.sql.execution.SQLExecution$.executeQuery$1(SQLExecution.scala:110) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:135) at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107) at org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:232) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:135) at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:253) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:134) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:772) at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:68) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$15(MicroBatchExecution.scala:584) at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:357) at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:355) at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:68) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runBatch(MicroBatchExecution.scala:584) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$2(MicroBatchExecution.scala:226) at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:12) at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:357) at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:355) at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:68) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$1(MicroBatchExecution.scala:194) at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:57) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:188) at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$runStream(StreamExecution.scala:333) at org.apache.spark.sql.execution.streaming.StreamExecution$anon$1.run(StreamExecution.scala:244) 2022-09-29 09:34:35,456 ERROR [stream execution thread for [id = b09b51e0-dd19-49a1-ae93-82b650e49bc9, runId = 2c922e4d-a24c-41a3-925a-a5d4842c9c0f]] streaming.MicroBatchExecution (Logging.scala:logError(94)): Query [id = b09b51e0-dd19-49a1-ae93-82b650e49bc9, runId = 2c922e4d-a24c-41a3-925a-a5d4842c9c0f] terminated with error py4j.Py4JException: An exception was raised by the Python Proxy. Return Message: Traceback (most recent call last): File "/opt/amazon/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 2442, in _call_proxy return_value = getattr(self.pool[obj_id], method)(params) File "/opt/amazon/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 196, in call raise e File "/opt/amazon/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 193, in call self.func(DataFrame(jdf, self.sql_ctx), batch_id) File "/opt/amazon/lib/python3.6/site-packages/awsglue/context.py", line 653, in batch_function_with_persist batch_function(data_frame, batchId) File "/tmp/ETLJob.py", line 50, in processBatch S3bucket_node3.writeFrame(KinesisStream_node1) File "/opt/amazon/lib/python3.6/site-packages/awsglue/data_sink.py", line 32, in writeFrame return DynamicFrame(self._jsink.pyWriteDynamicFrame(dynamic_frame._jdf, callsite(), info), dynamic_frame.glue_ctx, dynamic_frame.name + "_errors") File "/opt/amazon/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1305, in call answer, self.gateway_client, self.target_id, self.name) File "/opt/amazon/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 111, in deco return f(a, **kw) File "/opt/amazon/spark/python/lib/py4j-0.10.9-src.zip/py4j/protocol.py", line 328, in get_return_value format(target_id, ".", name), value) py4j.protocol.Py4JJavaError: An error occurred while calling o148.pyWriteDynamicFrame. : java.lang.NullPointerException at java.lang.Class.isAssignableFrom(Native Method) at org.apache.hadoop.conf.Configuration.setClass(Configuration.java:2710) at org.apache.hadoop.mapreduce.lib.output.FileOutputFormat.setOutputCompressorClass(FileOutputFormat.java:116) at com.amazonaws.services.glue.sinks.HadoopDataSink.$anonfun$writeDynamicFrame$2(HadoopDataSink.scala:201) at com.amazonaws.services.glue.util.FileSchemeWrapper.$anonfun$executeWithQualifiedScheme$1(FileSchemeWrapper.scala:77) at com.amazonaws.services.glue.util.FileSchemeWrapper.executeWith(FileSchemeWrapper.scala:70) at com.amazonaws.services.glue.util.FileSchemeWrapper.executeWithQualifiedScheme(FileSchemeWrapper.scala:77) at com.amazonaws.services.glue.sinks.HadoopDataSink.$anonfun$writeDynamicFrame$1(HadoopDataSink.scala:157) at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:253) at com.amazonaws.services.glue.sinks.HadoopDataSink.writeDynamicFrame(HadoopDataSink.scala:151) at com.amazonaws.services.glue.DataSink.pyWriteDynamicFrame(DataSink.scala:72) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) at py4j.Gateway.invoke(Gateway.java:282) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:238) at java.lang.Thread.run(Thread.java:750) at py4j.Protocol.getReturnValue(Protocol.java:476) at py4j.reflection.PythonProxyHandler.invoke(PythonProxyHandler.java:108) at com.sun.proxy.$Proxy54.call(Unknown Source) at org.apache.spark.sql.execution.streaming.sources.PythonForeachBatchHelper$.$anonfun$callForeachBatch$1(ForeachBatchSink.scala:55) at org.apache.spark.sql.execution.streaming.sources.PythonForeachBatchHelper$.$anonfun$callForeachBatch$1$adapted(ForeachBatchSink.scala:55) at org.apache.spark.sql.execution.streaming.sources.ForeachBatchSink.addBatch(ForeachBatchSink.scala:35) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$16(MicroBatchExecution.scala:586) at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107) at org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:232) at org.apache.spark.sql.execution.SQLExecution$.executeQuery$1(SQLExecution.scala:110) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:135) at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107) at org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:232) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:135) at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:253) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:134) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:772) at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:68) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$15(MicroBatchExecution.scala:584) at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:357) at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:355) at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:68) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runBatch(MicroBatchExecution.scala:584) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$2(MicroBatchExecution.scala:226) at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:12) at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:357) at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:355) at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:68) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$1(MicroBatchExecution.scala:194) at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:57) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:188) at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$runStream(StreamExecution.scala:333) at org.apache.spark.sql.execution.streaming.StreamExecution$anon$1.run(StreamExecution.scala:244)

This seems to be the offending line: py4j.protocol.Py4JJavaError: An error occurred while calling o148.pyWriteDynamicFrame. : java.lang.NullPointerException

anwt-aws commented 2 years ago

So I've done some further debugging on this and have discovered that it seems to be "Compression Type: None" that causes the error. By setting the Compression Type to Snappy, the job completes and outputs.

I've also tried changing the GlueVersion to 2.0 which gives a slightly better error message when using Compression Type:None :

StreamingQueryException: 'An exception was raised by the Python Proxy. Return Message: Traceback (most recent call last):\n File "/opt/amazon/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 2381, in _call_proxy\n return_value = getattr(self.pool[obj_id], method)(*params)\n File "/opt/amazon/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 191, in call\n raise e\n File "/opt/amazon/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 188, in call\n self.func(DataFrame(jdf, self.sql_ctx), batch_id)\n File "/opt/amazon/lib/python3.6/site-packages/awsglue/context.py", line 632, in batch_function_with_persist\n batch_function(data_frame, batchId)\n File "/tmp/ETLJob.py", line 50, in processBatch\n S3bucket_node3.writeFrame(KinesisStream_node1)\n File "/opt/amazon/lib/python3.6/site-packages/awsglue/data_sink.py", line 31, in writeFrame\n return DynamicFrame(self._jsink.pyWriteDynamicFrame(dynamic_frame._jdf, callsite(), info), dynamic_frame.glue_ctx, dynamic_fra

melodyyangaws commented 2 years ago

Thank you for reporting the Glue streaming issue. We have informed the related team and the fix is in progress. You are absolutely right. For now, the workaround is to set the Compression to Snappy.