spark-redshift-community / spark-redshift

Performant Redshift data source for Apache Spark
Apache License 2.0
137 stars 63 forks source link

Wrong FS error while import data in RedShift #103

Closed conker84 closed 2 years ago

conker84 commented 2 years ago

Hi everybody! I have a simple spark job that gets the data from Neo4j and tries to ingest into RedShift:

spark.sparkContext.hadoopConfiguration.set("fs.s3a.access.key", awss3accessid.get)
spark.sparkContext.hadoopConfiguration.set("fs.s3a.secret.key", awss3accessky.get)

val neo4jDF = ...

neo4jDF.write
      .format("io.github.spark_redshift_community.spark.redshift")
      .option("url", awsredshifturl.get)
      .option("dbtable", awsredshifttable.get)
      .option("tempdir", awss3tmpdir.get)
      .option("forward_spark_s3_credentials", "true")
      .save()

But I get the following error:

java.lang.IllegalArgumentException: Wrong FS s3://<my_bucket>//<dir>/manifest.json -expected s3a://<my_bucket>

    at org.apache.hadoop.fs.s3native.S3xLoginHelper.checkPath(S3xLoginHelper.java:224)
    at org.apache.hadoop.fs.s3a.S3AFileSystem.checkPath(S3AFileSystem.java:1155)
    at org.apache.hadoop.fs.FileSystem.makeQualified(FileSystem.java:666)
    at org.apache.hadoop.fs.s3a.S3AFileSystem.makeQualified(S3AFileSystem.java:1117)
    at org.apache.hadoop.fs.s3a.S3AFileSystem.qualify(S3AFileSystem.java:1143)
    at org.apache.hadoop.fs.s3a.S3AFileSystem.create(S3AFileSystem.java:1318)
    at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1195)
    at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1175)
    at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1064)
    at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1052)
    at io.github.spark_redshift_community.spark.redshift.RedshiftWriter.unloadData(RedshiftWriter.scala:330)
    at io.github.spark_redshift_community.spark.redshift.RedshiftWriter.saveToRedshift(RedshiftWriter.scala:405)
    at io.github.spark_redshift_community.spark.redshift.DefaultSource.createRelation(DefaultSource.scala:110)
    at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:45)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:75)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:73)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:84)
    at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:110)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103)
    at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
    at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:110)
    at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:106)
    at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:481)
    at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:82)
    at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:481)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:30)
    at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
    at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
    at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:457)
    at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:106)
    at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:93)
    at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:91)
    at org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:128)
    at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:848)
    at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:382)
    at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:355)
    at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:247)

Looking at the code it seems that the problem is related to these lines:

https://github.com/spark-redshift-community/spark-redshift/blob/1f12470735c17022ae2dd68d0963649577928dde/src/main/scala/io/github/spark_redshift_community/spark/redshift/RedshiftWriter.scala#L313-L332

At L313 fs is defined on s3a protocol instead whereas manifestPath at L329 is defined on a sanitized version that relies on s3 protocol causing my error

Is that correct?

gios91 commented 2 years ago

Hi @conker84, I'm getting exactly the same error log

java.lang.IllegalArgumentException: Wrong FS s3://<bucket-name>//athena_query/64902755-120d-4f9f-8cb6-660b4c1099fb/manifest.json -expected s3a://<bucket-name>
        at org.apache.hadoop.fs.s3native.S3xLoginHelper.checkPath(S3xLoginHelper.java:224)
        at org.apache.hadoop.fs.s3a.S3AFileSystem.checkPath(S3AFileSystem.java:1155)
        at org.apache.hadoop.fs.FileSystem.makeQualified(FileSystem.java:666)
        at org.apache.hadoop.fs.s3a.S3AFileSystem.makeQualified(S3AFileSystem.java:1117)
        at org.apache.hadoop.fs.s3a.S3AFileSystem.qualify(S3AFileSystem.java:1143)
        at org.apache.hadoop.fs.s3a.S3AFileSystem.create(S3AFileSystem.java:1318)
        at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1195)
        at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1175)
        at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1064)
        at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1052)
        at io.github.spark_redshift_community.spark.redshift.RedshiftWriter.unloadData(RedshiftWriter.scala:330)
        ...
        at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:355)
        at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:247)

with following specs:

lazy val sparkDeps = Seq(
  "org.apache.spark" %% "spark-core"  % sparkVersion % Provided,
  "org.apache.spark" %% "spark-sql"   % sparkVersion % Provided,
  "org.apache.spark" %% "spark-mllib" % sparkVersion % Provided,
  "org.apache.spark" %% "spark-avro" % sparkVersion,
  "org.apache.hadoop" % "hadoop-aws" % "3.2.0" % Provided,
  "com.amazonaws" % "aws-java-sdk-bundle" % "1.11.962" % Provided,
  "com.amazon.redshift" % "redshift-jdbc42" % "1.2.10.1009",
  "io.github.spark-redshift-community" %% "spark-redshift" % "5.0.4",
)

dependencyOverrides ++= Seq(
  "org.apache.hadoop" % "hadoop-aws" % "3.2.0"
)

when I try to call the this function (write DataFrame as Redshift table)

df.write
      .format("io.github.spark_redshift_community.spark.redshift")
      .option("url", s"jdbc:redshift://$redshiftHost:$port/$database?user=$username&password=$password")
      .option("dbtable", table)
      .option("tempdir", s3TempDirPath)
      .option(if(iamRole != null) "aws_iam_role" else "forward_spark_s3_credentials",
        if(iamRole != null) iamRole else "true")
      .mode(saveMode)
      .option("tempformat", "CSV GZIP")
      .option("extracopyoptions", "DELIMITER ','")
      .option("csvnullstring","")
    writer.save()

moreover, I have to say that

    spark.sparkContext.hadoopConfiguration.set("fs.s3a.access.key", ACCESS_KEY_ID)
    spark.sparkContext.hadoopConfiguration.set("fs.s3a.secret.key", SECRET_ACCESS_KEY)

In my idea, sanitizedTempDir could still been used because of Utils.removeCredentialsFromURI() which allows to remove credentials, but I don't understand why Utils.fixS3Url() which exactly does scala url.replaceAll("s3[an]://", "s3://") has to be used here while s3a protocol should be required (also because of S3A Hadoop configs).

Here is the change I propose to fix the bug.

Feel free to discuss or propose another solution. Thanks 👍

gios91 commented 2 years ago

@jsleight @lucagiovagnoli @88manpreet can you please support us on it? Many thanks!

conker84 commented 2 years ago

there is already a PR waiting for review @gios91

jsleight commented 2 years ago

I just merged #110 which should fix this.

JeremyBrent commented 1 year ago

Hi all and @conker84,

I am also getting the same error as above when writing redshift from PySpark.

  sc._jsc.hadoopConfiguration().set("fs.s3a.access.key", aws_access_key)
  sc._jsc.hadoopConfiguration().set("fs.s3a.secret.key", aws_secret_key)
  sc._jsc.hadoopConfiguration().set("fs.s3a.endpoint", "s3.us-east-1.amazonaws.com")
  sc._jsc.hadoopConfiguration().set("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")

  ...... 

  block_df.write.format("io.github.spark_redshift_community.spark.redshift"). \
        option("url", f"jdbc:redshift://{secrets['host']}:{secrets['port']}"
                      f"/{secrets['db']?user={secrets['username']}&password={secrets['password']}"). \
        option("dbtable", table). \
        option("tempdir", f's3a://{bucket}/{prefix}/tmp/'). \
        option("aws_iam_role", secrets['iam_role']). \
        mode("overwrite").save()

and I get the following:

Traceback (most recent call last):
  File "/home/ubuntu/spark_jobs/project/and/scripts/{script_name}.py", line 228, in <module>
    block_df.write.format("io.github.spark_redshift_community.spark.redshift"). \
  File "/opt/spark/spark-3.3.1-bin-hadoop3/python/lib/pyspark.zip/pyspark/sql/readwriter.py", line 966, in save
  File "/opt/spark/spark-3.3.1-bin-hadoop3/python/lib/py4j-0.10.9.5-src.zip/py4j/java_gateway.py", line 1321, in __call__
  File "/opt/spark/spark-3.3.1-bin-hadoop3/python/lib/pyspark.zip/pyspark/sql/utils.py", line 196, in deco
pyspark.sql.utils.IllegalArgumentException: Wrong FS s3://{bucket}//{prefix}/tmp/5b519ea1-00c0-48b0-98a7-64e10aa0a97d/manifest.json -expected s3a://{bucket}

I am using PySpark version 3.3.1 and scala version 2.12.15.

I downloaded the Spark-Redshift jar from the following, https://mvnrepository.com/artifact/io.github.spark-redshift-community/spark-redshift_2.12/5.1.0 (last updated Sep 30, 2022), which downloads spark-redshift_2.12-5.1.0-amzn-0.jar and I have it located in my SPARK_HOME.

Based on the date of the the PR associated with this issue (#110) and the last updated date of the maven repo, I assume the changes were included in the jars but I'm not 100% sure. I attemped to read the RedshiftWriter.class file using Javap but was not able to discern if the updates were included.

Let me know if you need more information. Any insight on the above would be much appreciated.

Thanks in advance! :)

jsleight commented 1 year ago

Hmmm, our changelog indicates the fix should be included in v5.1.0.

@JeremyBrent your traceback is slightly different than this issue though. I.e., your error is coming from python whereas the original error is coming from the jvm. I'm guessing maybe python intercepted the jvm exception and reformatted it?

Without the jvm stacktrace it is hard to tell what is going on unfortunately.

JeremyBrent commented 1 year ago

Hi @jsleight, thanks for getting back to me. Yeah I cannot seem to find the jvm exception in any of the logs.

The confusing part is that the job finishes with a success status (below comes from the spark events log file): {"Event":"SparkListenerJobEnd","Job ID":6,"Completion Time":1677096372200,"Job Result":{"Result":"JobSucceeded"}} The Spark History server shows that all executors completed their tasks successfully; all stages were marked as successful; all jobs were marked as successful; etc.

Theres no sign that this job failed (though it did) except for the small PySpark stack trace shown above and the lack of output data in Redshift.

Below is a stacktrace from the save at RedshiftWriter.scala:301 stage but there are no exceptions here (not sure if it'll provide any insight)

org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:239)
    io.github.spark_redshift_community.spark.redshift.RedshiftWriter.unloadData(RedshiftWriter.scala:301)
    io.github.spark_redshift_community.spark.redshift.RedshiftWriter.saveToRedshift(RedshiftWriter.scala:403)
    io.github.spark_redshift_community.spark.redshift.DefaultSource.createRelation(DefaultSource.scala:115)
    org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:47)
    org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:75)
    org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:73)
    org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:84)
    org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:98)
    org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:109)
    org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:169)
    org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:95)
    org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
    org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
    org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:98)
    org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:94)
    org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:584)
    org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:176)
    org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:584)
    org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:30)
JeremyBrent commented 1 year ago

Figured out my issue ....

I was incorrect in my initial assumption ("I downloaded the Spark-Redshift jar from the following, https://mvnrepository.com/artifact/io.github.spark-redshift-community/spark-redshift_2.12/5.1.0 (last updated Sep 30, 2022)")

We were using spark-redshift_2.12-5.1.0-amzn-0.jar which we obtained from an AWS EMR Cluster but needed to be using spark-redshift_2.12-5.1.0.jar from the link above. Once I updated the jar it worked fine.

Putting this here in case anyone runs into this issue in the future.

jsleight commented 1 year ago

Thanks for reporting back! cc @Brooke-white @kunaldeepsingh seems like there a few divergences between the open source and amazon jars.