databricks / spark-redshift

Redshift data source for Apache Spark
Apache License 2.0
606 stars 349 forks source link

Pyspark timestamps cast issue when reading MySQL DB & writing to Redshift #435

Open foivosana opened 5 years ago

foivosana commented 5 years ago
Python 2.7
Pyspark 2.2.1
JDBC format for MySQL->Spark DF
For writing Spark DF-> AWS Redshift i am using the `Spark-Redshift` driver from Databricks.

I am reading data into Spark from MySQL tables for my application, due to the context and depending on the input arguments i need to get either all records that were updated before today OR only the records that were updated until the requested date (included).

spark.read.format("jdbc")
            .option("url", "url")
            .option("driver", driver)
            .option("dbtable", query)
            .load()

and the query is

if days > 0:
    get_date = date.today() - timedelta(days)
    query = "(SELECT * FROM {} WHERE CAST({}.updatedAt AS date) >= DATE('{}') " \
            "AND CAST({}.updatedAt AS date) < CURDATE()) AS t".format(table, table, get_date, table)
elif days == 0:
    query = "(SELECT * FROM {} WHERE CAST({}.updatedAt AS date) < CURDATE() " \
            "OR updatedAt IS NULL) AS t".format(table, table)

This timestamp column is discarded as soon as the data are read into Spark dataframes and the ETL contains no other timestamp-related manipulations. The final step is to write the manipulated records to AWS Redshift tables.

My problem is that SOMETIMES the application crashes with a Caused by: java.lang.ClassCastException: java.lang.String cannot be cast to java.sql.Timestamp when writing to Redshift, but i guess the issue comes from converting when reading and is just Spark lazy execution that gives the exception when writing to Redshift (there are no timestamp or date columns in the target Redshift tables whatsoever)

In the last month and for 4 different jobs that run daily i got this exception in the logs about 15% of the time and then the jobs failed, but it runs fine most of the time which is making it impossible to reproduce the issue or debug further.

I suspect the String-> Timestamp casting within the SQL query creates the problem, but i am not sure how i can achieve the same in another way that will not throw this exception. Any help greatly appreciated!

More stacktrace info:

py4j.protocol.Py4JJavaError: An error occurred while calling o827.save.
: org.apache.spark.SparkException: Job aborted.
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply$mcV$sp(FileFormatWriter.scala:213)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:166)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:166)

and

at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:135)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:116)
at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:92)
at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:92)
at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:609)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:233)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:280)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:214)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 601 in stage 93.0 failed 4 times, most recent failure: Lost task 601.3 in stage 93.0 (TID 5282, url, executor 5): org.apache.spark.SparkException: Task failed while writing rows
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:270)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1$$anonfun$apply$mcV$sp$1.apply(FileFormatWriter.scala:189)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1$$anonfun$apply$mcV$sp$1.apply(FileFormatWriter.scala:188)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:108)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:338)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ClassCastException: java.lang.String cannot be cast to java.sql.Timestamp
at com.databricks.spark.redshift.RedshiftWriter$$anonfun$7$$anonfun$apply$3.apply(RedshiftWriter.scala:234)
at com.databricks.spark.redshift.RedshiftWriter$$anonfun$7$$anonfun$apply$3.apply(RedshiftWriter.scala:233)
at com.databricks.spark.redshift.RedshiftWriter$$anonfun$8$$anonfun$apply$5.apply(RedshiftWriter.scala:252)
at com.databricks.spark.redshift.RedshiftWriter$$anonfun$8$$anonfun$apply$5.apply(RedshiftWriter.scala:248)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$SingleDirectoryWriteTask.execute(FileFormatWriter.scala:324)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:256)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:254)
at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1371)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:259)
foivosana commented 5 years ago

UPDATE I tried a lot of things with casting etc, my problem still persists and interestingly enough i got a more informative stacktrace at some point

Caused by: java.lang.RuntimeException: Error while encoding: java.lang.RuntimeException: java.sql.Timestamp is not a valid external type for schema of string
if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 0, formid), StringType), true) AS column1#2498
if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 1, formtitle), StringType), true) AS column2#2499
if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 2, responserate), FloatType) AS column3#2500
if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 14, languageid), IntegerType) AS column4#2512
if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 16, answer), IntegerType) AS column5#2514
if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 18, userid), IntegerType) AS column6#2516
if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 19, inviteid), StringType), true) AS column7#2517
if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 20, daterequested), StringType), true) AS column8#2518
if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 21, dateanswered), StringType), true) AS column9#2519
if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 30, customproperty9), StringType), true) AS customproperty9#2528
if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 31, customproperty10), StringType), true) AS customproperty10#2529
staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 32, updated), StringType), true) AS updated#2530
at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder.toRow(ExpressionEncoder.scala:290)
at org.apache.spark.sql.SparkSession$$anonfun$3.apply(SparkSession.scala:582)
at org.apache.spark.sql.SparkSession$$anonfun$3.apply(SparkSession.scala:582)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$SingleDirectoryWriteTask.execute(FileFormatWriter.scala:324)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:256)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:254)
at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1371)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:259)
... 8 more
Caused by: java.lang.RuntimeException: java.sql.Timestamp is not a valid external type for schema of string
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.evalIfFalseExpr20$(Unknown Source)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply_8$(Unknown Source)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source)
at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder.toRow(ExpressionEncoder.scala:287)

It looks like it comes down to issue with encoding the last timestamp column, but why are the TIMESTAMP columns parsed as StringType().

Spark dataframe schema that is being written:

root
|-- column1: string (nullable = true)
|-- column2: string (nullable = true)
|-- column3: float (nullable = true)
|-- column4: integer (nullable = true)
|-- column5: integer (nullable = true)
|-- column6: integer (nullable = true)
|-- column7: string (nullable = true)
|-- column8: timestamp (nullable = true)
|-- column9: timestamp (nullable = true)
|-- customProperty9: string (nullable = true)
|-- customProperty10: string (nullable = true)
|-- updated: timestamp (nullable = false)

and the target Redshift table has the equivalent schema data types, anyone an idea of where this issue might come from?

foivosana commented 5 years ago

UPDATE #2 As a workaround i write the timestamps as strings instead, however looking at the temp files of the intermediate results it looks like there is an issue with the timestamp format, more specifically:

Using AVRO temp file format i indeed see in the schema specification that Spark timestamp columns are parsed as strings, even though in the df schema they appear as timestamps as above. AWS Redshift documentation says the default format accepts fraction up to 5 digits, however this proved not to be true in this case at least

Setting TIMEFORMAT 'auto' in the extraCopyOptions appears(?) to solve the issue locally, but since i am using AWS Glue for my Spark job i cannot be sure as i cannot load all the data locally to see if a malformed record creates the problem. I wouldn't bet on it though as i tried to cast all 3 columns to the same format with withColumn('column', F.to_timestamp('column', format="yyyy-MM-dd hh:mm:ss")) and this also didnt work, as the fraction milliseconds remain as well. What is strange is that the columns containing the timestamps in string format after the workaround do not look to maintain the fraction.

Does this issue ring any bell to someone with more knowledge in the internals of the spark-redshift data source?

prakharjain-vogo commented 4 years ago

Me Too

My problem is that SOMETIMES the application crashes with a

Caused by: java.lang.ClassCastException: java.lang.String cannot be cast to java.sql.Timestamp

Strange thing is that it happens SOMETIMES!

harshitsinghai77 commented 1 year ago

Facing the same issue. The error I'm getting is Timestamp format must be yyyy-mm-dd hh:mm:ss when reading data from redshift through pyspark