databricks / spark-avro

Avro Data Source for Apache Spark
http://databricks.com/
Apache License 2.0
539 stars 310 forks source link

Spark-avro Fails To Write DF in EMR 4.0 #91

Closed nadersalehi closed 7 years ago

nadersalehi commented 9 years ago

I have a simple code which works without a problem when running on my laptop (which uses pre-built spark-1.4.1 hadoop 2.4). When running the same code in EMR 4.0, the code crashes when it tries to execute the following line

time_series_df.write.format('com.databricks.spark.avro').save(args.output_dir)

Stack trace is provided below:

2015-09-23 01:59:42,559 ERROR [Thread-3] sources.DefaultWriterContainer (Logging.scala:logError(75)) - Job job_201509230159_0000 aborted. Traceback (most recent call last): File "/home/hadoop/./time_series_producer.py", line 336, in main() File "/home/hadoop/./time_series_producer.py", line 333, in main .save(args.output_dir) File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/readwriter.py", line 304, in save File "/usr/lib/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py", line 538, in call File "/usr/lib/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py", line 300, in get_return_value py4j.protocol.Py4JJavaError: An error occurred while calling o171.save. : org.apache.spark.SparkException: Job aborted. at org.apache.spark.sql.sources.InsertIntoHadoopFsRelation.insert(commands.scala:166) at org.apache.spark.sql.sources.InsertIntoHadoopFsRelation.run(commands.scala:139) at org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult$lzycompute(commands.scala:57) at org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult(commands.scala:57) at org.apache.spark.sql.execution.ExecutedCommand.doExecute(commands.scala:68) at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:88) at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:88) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147) at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:87) at org.apache.spark.sql.SQLContext$QueryExecution.toRdd$lzycompute(SQLContext.scala:950) at org.apache.spark.sql.SQLContext$QueryExecution.toRdd(SQLContext.scala:950) at org.apache.spark.sql.sources.ResolvedDataSource$.apply(ddl.scala:336) at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:144) at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:135) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379) at py4j.Gateway.invoke(Gateway.java:259) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:207) at java.lang.Thread.run(Thread.java:745) Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 11.0 failed 4 times, most recent failure: Lost task 1.3 in stage 11.0 (TID 71, ip-172-31-36-39.ec2.internal): org.apache.spark.SparkException: Task failed while writing rows. at org.apache.spark.sql.sources.InsertIntoHadoopFsRelation.org$apache$spark$sql$sources$InsertIntoHadoopFsRelation$$writeRows$1(commands.scala:191) at org.apache.spark.sql.sources.InsertIntoHadoopFsRelation$$anonfun$insert$1.apply(commands.scala:160) at org.apache.spark.sql.sources.InsertIntoHadoopFsRelation$$anonfun$insert$1.apply(commands.scala:160) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63) at org.apache.spark.scheduler.Task.run(Task.scala:70) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) Caused by: java.lang.NoSuchMethodError: org.apache.avro.generic.GenericData.createDatumWriter(Lorg/apache/avro/Schema;)Lorg/apache/avro/io/DatumWriter; at org.apache.avro.mapreduce.AvroKeyRecordWriter.(AvroKeyRecordWriter.java:55) at org.apache.avro.mapreduce.AvroKeyOutputFormat$RecordWriterFactory.create(AvroKeyOutputFormat.java:79) at org.apache.avro.mapreduce.AvroKeyOutputFormat.getRecordWriter(AvroKeyOutputFormat.java:105) at com.databricks.spark.avro.AvroOutputWriter.(AvroOutputWriter.scala:82) at com.databricks.spark.avro.AvroOutputWriterFactory.newInstance(AvroOutputWriterFactory.scala:31) at org.apache.spark.sql.sources.DefaultWriterContainer.initWriters(commands.scala:470) at org.apache.spark.sql.sources.BaseWriterContainer.executorSideSetup(commands.scala:360) at org.apache.spark.sql.sources.InsertIntoHadoopFsRelation.org$apache$spark$sql$sources$InsertIntoHadoopFsRelation$$writeRows$1(commands.scala:172) ... 8 more

Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1273) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1264) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1263) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1263) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:730) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:730) at scala.Option.foreach(Option.scala:236) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:730) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1457) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1418) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)

2015-09-23 01:59:42,591 INFO [task-result-getter-0] scheduler.TaskSetManager (Logging.scala:logInfo(59)) - Lost task 5.3 in stage 11.0 (TID 76) on executor ip-172-31-36-38.ec2.internal: org.apache.spark.SparkException (Task failed while writing rows.) [duplicate 28]

JoshRosen commented 9 years ago

I think that the most informative part of this stacktrace is the following exception:

Caused by: java.lang.NoSuchMethodError: org.apache.avro.generic.GenericData.createDatumWriter(Lorg/apache/avro/Schema;)Lorg/apache/avro/io/DatumWriter;

Based on this, I suspect that your EMR environment is using a different Avro version than whatever version is used when you run locally. I'm not super-familiar with Spark in EMR; do you know which version of Spark is being used there?

JoshRosen commented 9 years ago

Also, just to clarify: which version of spark-avro are you using?

nadersalehi commented 9 years ago

EMR 4.0 uses spark 1.4.1 on Hadoop 2.6 YARN. I use spark-avro version 2.0.1-s_2.10.

A few things to mention:

jaley commented 9 years ago

Hey @nadersalehi

We use spark-avro on EMR 4.0 quite a lot and also ran into this problem initially. It's just the usual dependency hell. Ironically, though the fact that Avro is used quite a bit throughout the Hadoop ecosystem seemed like an advantage at first, it's turned out to be one of the main causes of difficulties, as it means there are often many different versions polluting the classpath in different deployment environments.

We found that EMR 4.0 does in fact provide Avro 1.7.4, but it should still be possible to make spark-avro work without hacking the EMR environment. We embed spark-avro in our jar as a bundled dependency and it seems to take priority over the jar on the EMR environment. The cause of this problem for us was actually that transitive dependencies from other libraries we use were also pulling in conflicting Avro versions. Try using whatever build tool you have to visualise your dependency tree and find any other Avro versions being pulled in first.

In any case, I don't think there's anything that can be done on the spark-avro side to improve this. This library necessarily needs to pull in Avro as a dependency and can't reasonably assume it'll be provided.

Hope that helps

nadersalehi commented 9 years ago

@jaley

Thanks for the clarification. It makes a lot of sense. I can verify that EMR 4.0 is using Avro 1.7.4 --1.7.5 in case of pig. Our application uses pyspark, so I am not sure how I could embed the correct version of spark-avro as a dependency. Any help would be appreciated.

nadersalehi commented 9 years ago

@JoshRosen & @jaley

I poked around a bit further and noticed that the problem stems from not having spark-avro in executor-nodes. As a workaround, I copied com.databricks_spark-avro_2.11-2.0.1.jar and org.apache.avro_avro-1.7.x.jar into /home/hadoop/.ivy2/jar, added the directory to SPARK_CLASSPATH and ran the application to completion.

I was under the impression that the '--package' option takes care of distributing a given package, and its dependencies, to driver and executor nodes, but it only happens on the driver. While this is clearly not an Avro specific problem, do you guys have any suggestions on how to address my problem before I close this issue?

Thanks, Nader

JoshRosen commented 9 years ago

@brkyvz, do you have any insights into what might be happening here with --packages?

brkyvz commented 9 years ago

@nadersalehi In order to double check: In the last messages, you mentioned that you copied: com.databricksspark-avro2.11-2.0.1 to the executors, but in your --packages command, you were using com.databricks:spark-avro_2.10:2.0.1. I wonder if this was a simple scala version, binary incompatibility problem

nadersalehi commented 9 years ago

@brkyvz

Sorry, that was a typo; I actually copied com.databricks:spark-avro_2.10:2.0.1. The main issue here, IMHO, is that I don't see any spark-avro package on the executor nodes unless I manually copy them.

brkyvz commented 9 years ago

@JoshRosen @nadersalehi Then I think it may be an issue related to yarn being used with --packages. I'll look into it.

defilercompiler commented 9 years ago

@nadersalehi Having the same issue, using scala-shell on EMR 4.1.0, avro 1.7.5 needed but EMR has 1.7.4. I'm trying following: spark-shell --jars avro-1.7.6.jar,spark-avro_2.11-2.0.1.jar,RedshiftJDBC41-1.1.7.1007.jar --packages org.apache.avro:avro:1.7.6,com.databricks:spark-avro_2.11:2.0.1,com.databricks:spark-redshift_2.10:0.5.1 --conf spark.driver.userClassPathFirst=true --conf spark.executor.userClassPathFirst=true But yeah, it's not picking them up... did you have to put those in /home/hadoop/.ivy2/jar/ on all nodes?

alexnastetsky commented 9 years ago

Got it to work by simply deleting all avro jars off the cluster. It picks up /usr/lib/spark/lib/spark-assembly.jar in the classpath, which contains both avro and avro-mapred libs of 1.7.7 version -- that's the only one I don't delete.

# delete jars from master node
find / -name "*avro*jar" 2> /dev/null -print0 | xargs -0 -I file sudo rm file
# delete jars from slave nodes
yarn node -list | sed 's/ .*//g' | tail -n +3 | sed 's/:.*//g' | xargs -I node ssh node "find / -name "*avro*jar" 2> /dev/null -print0 | xargs -0 -I file sudo rm file"
defilercompiler commented 9 years ago

I confirm this fixes the problem. Thanks Alex. I tried just the first part (on master node), got frustrated and quit.

EMR 4.1.0 has still avro 1.7.4 (IMO 1.7.5 needed), I filled a ticket with Amazon, hopefully they'll include it in the next EMR version.

ejono commented 8 years ago

Hi, Jonathan from EMR here. Part of the problem is that Hadoop depends upon Avro 1.7.4, and the full Hadoop classpath is included in the Spark path. Do you think it might help for us to upgrade Hadoop to Avro 1.7.7 to match with Spark's dependency? Is Avro backward-compatible such that we can even expect that this shouldn't break anything in Hadoop?

eprochasson commented 8 years ago

Unfortunately, as spark-avro is also used in spark-redshift to write into the database, it seems that the whole thing is broken on EMR 4.* :(

eprochasson commented 8 years ago

Any workaround for this issue? Deleting all file off the cluster works, but I couldn't find how to automate it (bootstrap actions happens too early in the process).

defilercompiler commented 8 years ago

Yeah, whole writing doesn't work on EMR 4.*, Jonathan suggested setting spark-defaults classification of EMR cluster config -> spark.{driver,executor}.userClassPathFirst He wasn't sure it'd work and I wasn't successful trying it out but didn't try too hard. If you make it work, let me know.

emlyn commented 8 years ago

We found a bit of a hacky workaround: we created a bootstrap action to copy the avro-1.7.7 jar into /usr/lib/hadoop-mapreduce, then when Spark runs, this seems to take precedence over Hadoop's avro jar. I'm not completely sure why this works, but in our (limited) testing it seems to be consistent, maybe because the new jar is written first, so comes it up first in the directory listing.

eprochasson commented 8 years ago

It seems to be working indeed! If anyone needs a quick solution, I uploaded the following script to S3 and successfully use it as a bootstrap action.

#!/bin/bash
# Get Avro jar and copy it into /usr/lib/hadoop-mapreduce

cd /tmp/
wget http://ftp.cuhk.edu.hk/pub/packages/apache.org/avro/avro-1.7.7/java/avro-1.7.7.jar
sudo mkdir -p /usr/lib/hadoop-mapreduce
sudo cp avro-1.7.7.jar /usr/lib/hadoop-mapreduce

You might want to change the mirror to something closer to your instance.

emlyn commented 8 years ago

Just a heads up that the above hack stopped working for us on emr-4.2.0 - it picks up the old avro jar ahead of the new one on the classpath. But it's easy enough to work around - just put it in a different directory, and include that directory at the beginning of spark.{driver,executor}.extraClassPath.

tangfucius commented 8 years ago

@emlyn 's last solution worked for us for a while...until recently when this error popped up again, despite us using avro-1.7.7.jar (I checked the class path as mentioned here: https://github.com/databricks/spark-redshift/issues/129#issuecomment-160740317), so this really doesn't make much sense to me. We are on EMR 4.3.

veenits commented 8 years ago

I'm new to AWS and I'm kind of confused about the solution posted here. Can someone put in steps here in order to load/bootstrap spark-avro in my EMR cluster so that I can use it in Zeppelin notebook? (Since I am in an enterprise enviroment, I do not have access to the web to set it up in the cluster directly.)

JoshRosen commented 7 years ago

I think that this issue is somewhat out of scope / not actionable by us right now. If someone has a set of instructions which work and are easy to follow, then please submit a PR to add them to the README. I'm going to close this issue for now in order to declutter the issues backlog, but please re-open if new issues of this type start occurring.

jeanr84 commented 6 years ago

I just faced the same problem :

Caused by: java.lang.NoSuchMethodError: org.apache.avro.generic.GenericData.createDatumWriter(Lorg/apache/avro/Schema;)Lorg/apache/avro/io/DatumWriter;

I use following versions : EMR : 5.13.0 Spark : 2.3.0

In our Scala code we use this dependency

lazy val sparkAvroDependency = libraryDependencies +=
      "com.databricks" %% "spark-avro" % "4.0.0"

I resolved the problem by replacing the avro jar version in /etc/spark/conf/spark-defaults.conf

spark.driver.extraClassPath      /usr/lib/hadoop/lib/avro-1.7.4.jar:/usr/lib/hadoop-lzo/lib/*:/usr/lib/hadoop/hadoop-aws.jar:/usr/share/aws/aws-java-sdk/*:/usr/share/aws/emr/emrfs/conf:/usr/share/aws/emr/emrfs/lib/*:/usr/share/aws/emr/emrfs/auxlib/*:/usr/share/aws/emr/security/conf:/usr/share/aws/emr/security/lib/*:/usr/share/aws/hmclient/lib/aws-glue-datacatalog-spark-client.jar:/usr/share/java/Hive-JSON-Serde/hive-openx-serde.jar:/usr/share/aws/sagemaker-spark-sdk/lib/sagemaker-spark-sdk.jar
spark.executor.extraClassPath    /usr/lib/hadoop/lib/avro-1.7.4.jar:/usr/lib/hadoop-lzo/lib/*:/usr/lib/hadoop/hadoop-aws.jar:/usr/share/aws/aws-java-sdk/*:/usr/share/aws/emr/emrfs/conf:/usr/share/aws/emr/emrfs/lib/*:/usr/share/aws/emr/emrfs/auxlib/*:/usr/share/aws/emr/security/conf:/usr/share/aws/emr/security/lib/*:/usr/share/aws/hmclient/lib/aws-glue-datacatalog-spark-client.jar:/usr/share/java/Hive-JSON-Serde/hive-openx-serde.jar:/usr/share/aws/sagemaker-spark-sdk/lib/sagemaker-spark-sdk.jar

by

spark.driver.extraClassPath      /usr/lib/spark/jars/avro-1.7.7.jar:/usr/lib/hadoop-lzo/lib/*:/usr/lib/hadoop/hadoop-aws.jar:/usr/share/aws/aws-java-sdk/*:/usr/share/aws/emr/emrfs/conf:/usr/share/aws/emr/emrfs/lib/*:/usr/share/aws/emr/emrfs/auxlib/*:/usr/share/aws/emr/security/conf:/usr/share/aws/emr/security/lib/*:/usr/share/aws/hmclient/lib/aws-glue-datacatalog-spark-client.jar:/usr/share/java/Hive-JSON-Serde/hive-openx-serde.jar:/usr/share/aws/sagemaker-spark-sdk/lib/sagemaker-spark-sdk.jar
spark.executor.extraClassPath    /usr/lib/spark/jars/avro-1.7.7.jar:/usr/lib/hadoop-lzo/lib/*:/usr/lib/hadoop/hadoop-aws.jar:/usr/share/aws/aws-java-sdk/*:/usr/share/aws/emr/emrfs/conf:/usr/share/aws/emr/emrfs/lib/*:/usr/share/aws/emr/emrfs/auxlib/*:/usr/share/aws/emr/security/conf:/usr/share/aws/emr/security/lib/*:/usr/share/aws/hmclient/lib/aws-glue-datacatalog-spark-client.jar:/usr/share/java/Hive-JSON-Serde/hive-openx-serde.jar:/usr/share/aws/sagemaker-spark-sdk/lib/sagemaker-spark-sdk.jar