databricks / spark-redshift

Redshift data source for Apache Spark
Apache License 2.0
607 stars 349 forks source link

Error when writing to Redshift #378

Open HemanK opened 6 years ago

HemanK commented 6 years ago

Hi, I am getting the following error when trying to write to Redshift from EMR/Spark. I am able to read from Redshift successfully. I am using Spark 2.2.0 on EMR and the databricks-redshift driver.

Appreciate any help to get this resolved quickly.

Caused by: java.lang.AbstractMethodError: org.apache.spark.sql.execution.datasources.OutputWriterFactory.getFileExtension(Lorg/apache/hadoop/mapreduce/TaskAttemptContext;)Ljava/lang/String

See details below.

Thanks!

DETAILS 17/12/01 01:25:43 WARN Utils$: The S3 bucket XXXXX does not have an object lifecycle configuration to ensure cleanup of temporary files. Consider configuring tempdir to point to a bucket with an object lifecycle policy that automatically deletes files after an expiration period. For more information, see https://docs.aws.amazon.com/AmazonS3/latest/dev/object-lifecycle-mgmt.html

[Stage 22:> (0 + 2) / 2]17/12/01 01:27:28 WARN TaskSetManager: Lost task 1.0 in stage 22.0 (TID 1234, ip-nnn-nnn-nnn.us-east-2.compute.internal, executor 11): org.apache.spark.SparkException: Task failed while writing rows at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:272) at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1$$anonfun$apply$mcV$sp$1.apply(FileFormatWriter.scala:191) at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1$$anonfun$apply$mcV$sp$1.apply(FileFormatWriter.scala:190) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) at org.apache.spark.scheduler.Task.run(Task.scala:108) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: java.lang.AbstractMethodError: org.apache.spark.sql.execution.datasources.OutputWriterFactory.getFileExtension(Lorg/apache/hadoop/mapreduce/TaskAttemptContext;)Ljava/lang/String; at org.apache.spark.sql.execution.datasources.FileFormatWriter$SingleDirectoryWriteTask.newOutputWriter(FileFormatWriter.scala:299) at org.apache.spark.sql.execution.datasources.FileFormatWriter$SingleDirectoryWriteTask.execute(FileFormatWriter.scala:314) at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:258) at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:256) at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1375) at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:261) ... 8 more

17/12/01 01:59:12 ERROR TaskSetManager: Task 1 in stage 3.0 failed 4 times; aborting job 17/12/01 01:59:12 ERROR FileFormatWriter: Aborting job null.

This is while trying to do the demo at AWS Big Data blog: Powering Amazon Redshift Analytics with Apache Spark and Amazon Machine Learning (https://aws.amazon.com/blogs/big-data/powering-amazon-redshift-analytics-with-apache-spark-and-amazon-machine-learning/#more-1340). My EMR cluster has Spark 2.2.0, and I invoked Spark as below:

======== spark-shell --jars spark-redshift_2.10-2.0.0.jar,/usr/share/aws/redshift/jdbc/RedshiftJDBC41.jar,minimal-json-0.9.4.jar,spark-avro_2.11-3.0.0.jar

Here is the code snippet used for writing..

val s3TempDir2 = "s3:///predict_flight_delay_with_Spark/output2/"

flightsDF.write .format("com.databricks.spark.redshift") .option("temporary_aws_access_key_id", awsAccessKey) .option("temporary_aws_secret_access_key", awsSecretKey) .option("temporary_aws_session_token", token) .option("url", jdbcURL) .option("dbtable", "ord_flights_new") .option("aws_iam_role", "arn:aws:iam::xxxxxxxxx:role/RedShiftFullAccess") .option("tempdir", s3TempDir2) .mode(SaveMode.Overwrite) .save()

============ print (firstFlightsDF) [2702961,10,2,2017-01-10,6,AA,1700,13,0,89,1,599]

flightsDF.printSchema() root |-- id: long (nullable = true) |-- day_of_month: integer (nullable = true) |-- day_of_week: integer (nullable = true) |-- fl_date: date (nullable = true) |-- days_to_holiday: integer (nullable = true) |-- unique_carrier: string (nullable = true) |-- fl_num: string (nullable = true) |-- dep_hour: string (nullable = true) |-- dep_del15: integer (nullable = true) |-- air_time: integer (nullable = true) |-- flights: integer (nullable = true) |-- distance: integer (nullable = true)

flightsDF: org.apache.spark.sql.DataFrame = [id: bigint, day_of_month: int ... 10 more fields] firstFlightsDF: org.apache.spark.sql.Row = [2702961,10,2,2017-01-10,6,AA,1700,13,0,89,1,599]

=============

HemanK commented 6 years ago

One additional question: I currently use normal s3 syntax.
s3TempDir2 = "s3:///predict_flight_delay_with_Spark/output2/

Should I be using s3n:///folder instead?

Note that my Redshift cluster version is 1.0.1499.

Thanks!

cjangrist commented 6 years ago

You have an extra slash I'm s3

dhanashree25 commented 6 years ago

@HemanK - How did you get this working? I am struck with same error.

manmohan-puppala-bose commented 6 years ago

Were you able to fix this issue?

dhanashree25 commented 6 years ago

yes. I had mismatched versions of Jarfiles

GabeChurch commented 6 years ago

I also got the

Caused by: java.lang.AbstractMethodError: org.apache.spark.sql.execution.datasources.OutputWriterFactory.getFileExtension(Lorg/apache/hadoop/mapreduce/TaskAttemptContext;)Ljava/lang/String;

error when trying to write to redshift from spark.

Would you mind sharing which jars ending up working for you? I ended up using s3n to manually save to tables to s3, and then manually create the table in redshift, then copied from s3 to get around this.

I have tried spark 2.1 -> 2.3 with

"databricks % spark-redshift % 3.0.0-preview1",
    "org.apache.hadoop % hadoop-aws % 2.8.0",
    "com.amazonaws % aws-java-sdk % 1.7.4"
"databricks % spark-redshift % 2.0.1",
    "org.apache.hadoop % hadoop-aws % 2.7.3",
    "com.amazonaws % aws-java-sdk % 1.7.4"
"databricks % spark-redshift % 2.0.1",
    "org.apache.hadoop % hadoop-aws % 2.7.3",
    "com.amazonaws % aws-java-sdk % 1.7.4"

One more question, are you using IAM roles or access keys?

GabeChurch commented 6 years ago

I'll go ahead and post the solution that I came up with. Basically what I've discovered is that there are some compatibility issues using certain hadoop-aws jar to write to s3 with the s3a format. By switching to the s3n format you can combat this, the hadoop-aws 2.7.4 jar seems to be the consensus safest choice.

The aformentioned error also seems to be directly associated with an avro formatting problem.

I was able to write to redshift with all of the aforementioned combinations when using the following code to write.

df.write
    .format("com.databricks.spark.redshift")
    .option("url", "jdbc:redshift://yoururl$user=yourUser&password=your_password")
    .option("dbtable", "optionaldbname.tablename")
    .option("forward_spark_s3_credentials",true)
    .option("tempFormat", "CSV GZIP")  //default is avro, can also use CSV and CSV GZIP formats
    .option("tempdir", "s3n://myS3/path/to/bucket")
    .mode(SaveMode.Overwrite)
    .save()

Still interested in getting avro to work

daddalaramu commented 6 years ago

@HemanK - can you please let me know how did you resolve this issue?

I am getting smilar issue - my jar versions are spark-redshift_2.11-0.6.0.jar, RedshiftJDBC42-1.1.17.1017.jar, spark-avro_2.11-4.0.0.jar

not sure what i am missing here

manmohan-puppala-bose commented 6 years ago

Try using RedshiftJDBC41-1.2.15.1025.jar @daddalaramu

daddalaramu commented 6 years ago

Same error --

I am reading the data from csv file and writing into redshift - not sure if i have any issue in the below dataframe syntax

sourceDF.write .format("com.databricks.spark.redshift") .option("url", "jdbc:redshift://databasename:5439/biodrive?user=XXXXX&password=XXXXX") .option("dbtable", "dev.sample") .option("tempFormat", "CSV GZIP") .option("tempdir", "s3://foldername/temp/") .option("encryption", "true") .option("temporary_aws_access_key_id", "XXXXXX") .option("temporary_aws_secret_access_key", "XXXXX") .option("temporary_aws_session_token", "YYYYYY") .mode(SaveMode.Overwrite) .save()

lakshmi20 commented 5 years ago

@GabeChurch Were you able to get the avro format to work ?

foivosana commented 5 years ago

what if you just need the Redshift-S3 connection without any HDFS in place? I just need to write to Redshift from AWS Glue but keep getting hadoop security access permissions errors for the S3 spark temp directory i have defined, but i created the temp bucket with the same account!

utgarda commented 5 years ago

In my case, using the current master-SNAPSHOT, adding tempFormat option fixed it: .option("tempFormat", "CSV GZIP")

zbinkleytest commented 5 years ago

Has anyone been able to solve this in Spark 2.4.3? I am using the following jars and packages when running a local instance, and have no trouble reading from Redshift. I am getting the same error as above when writing however (even when switching to csv format).

spark = SparkSession.builder.master("local").appName("Test")\ .config("spark.jars", 'RedshiftJDBC4-1.2.1.1001.jar,jets3t-0.9.0.jar,spark-avro_2.11-4.0.0.jar,hadoop-aws-2.7.4.jar')\ .config("spark.jars.packages", 'com.databricks:spark-redshift_2.10:0.5.0,com.amazonaws:aws-java-sdk:1.10.34,org.apache.hadoop:hadoop-aws:2.7.4')\ .getOrCreate()