spark-redshift-community / spark-redshift

Performant Redshift data source for Apache Spark
Apache License 2.0
136 stars 62 forks source link

Reading from a different region #130

Closed MarcBarreiro closed 1 year ago

MarcBarreiro commented 1 year ago

Hello, I'm trying to read a table from Redshift using a S3 bucket located in a different region but I'm getting the error:

[main] ERROR io.github.spark_redshift_community.spark.redshift.RedshiftRelation - The Redshift cluster and S3 bucket are in different regions (eu-west-1 and eu-west-3, respectively). Redshift's UNLOAD command requires that the Redshift cluster and Amazon S3 bucket be located in the same region, so this read will fail.

The function I'm using is:

private def readDataFromRedshift(
    spark: SparkSession, jdbcUrl: String, user: String, password: String, table: String,
    bucketName: String, accessKey: String, secretKey: String, s3region: String
  ): DataFrame = {

    spark.sparkContext.hadoopConfiguration.set("fs.s3n.awsAccessKeyId", accessKey)
    spark.sparkContext.hadoopConfiguration.set("fs.s3n.awsSecretAccessKey", secretKey)
    spark.sparkContext.hadoopConfiguration.set("fs.s3a.access.key", accessKey)
    spark.sparkContext.hadoopConfiguration.set("fs.s3a.secret.key", secretKey)
    spark.sparkContext.hadoopConfiguration.set("fs.s3a.aws.credentials.provider", "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider")

    val tempDir: String = s"s3a://${bucketName}"

    spark.read.format("io.github.spark_redshift_community.spark.redshift")
      .option("url", jdbcUrl)
      .option("forward_spark_s3_credentials", "true")
      .option("user", user)
      .option("password", password)
      .option("tempdir", tempDir)
      .option("tempdir_region", s3region)
      .option("temp_format", "CSV")
      .option("query", s"SELECT * FROM $table")
      .load()
  }

I've provided the parameter tempdir_region to indicate that the Redshift cluster and S3 bucket are in different regions. I've also tried to set the temp_format parameter to CSV because according to the documentation "Parquet should not be used as the tempformat when using an S3 bucket (tempdir) in a region that is different from the region where the redshift cluster you are writing to resides."

I've been able to read the table in my Azure Databricks cluster using:

private def readDataFromRedshift(
    spark: SparkSession, jdbcUrl: String, user: String, password: String, table: String,
    bucketName: String, accessKey: String, secretKey: String, s3region: String
  ): DataFrame = {

    spark.sparkContext.hadoopConfiguration.set("fs.s3n.awsAccessKeyId", accessKey)
    spark.sparkContext.hadoopConfiguration.set("fs.s3n.awsSecretAccessKey", secretKey)
    spark.sparkContext.hadoopConfiguration.set("fs.s3a.access.key", accessKey)
    spark.sparkContext.hadoopConfiguration.set("fs.s3a.secret.key", secretKey)
    spark.sparkContext.hadoopConfiguration.set("fs.s3a.aws.credentials.provider", "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider")

    val tempDir: String = s"s3a://${bucketName}"

    spark.read.format("com.databricks.spark.redshift")
      .option("url", jdbcUrl)
      .option("forward_spark_s3_credentials", "true")
      .option("user", user)
      .option("password", password)
      .option("tempdir", tempDir)
      .option("awsregion", s3region)
      .option("query", s"SELECT * FROM $table")
      .load()
  }

But I'm failing to read the table from my local PC, where I am using Scala 2.12. Investigating, I came to this project, which looks like the way to go for Scala 2.12. Am I missing something? Thanks a lot in advance!

jsleight commented 1 year ago

The cross region loading is relatively new, @bsharifi any idea what is happening?

bsharifi commented 1 year ago

@MarcBarreiro The error message you are receiving is from the older 5.1.0 version of the connector. Can you please double-check that you are using the newer 6.0.0 version of the connector?

MarcBarreiro commented 1 year ago

Hi @bsharifi @jsleight , thanks for the answer. I was indeed using the 5.1.0 version. I've changed the version and this is what my pom.xml looks like now:

<dependency>
            <groupId>com.amazon.redshift</groupId>
            <artifactId>redshift-jdbc42</artifactId>
            <version>2.1.0.17</version>
</dependency>

<dependency>
            <groupId>io.github.spark-redshift-community</groupId>
            <artifactId>spark-redshift_2.12</artifactId>
            <version>6.0.0-spark_3.4</version>
</dependency>

But now I get a different error:

Exception in thread "main" java.lang.IncompatibleClassChangeError: Found class org.apache.spark.sql.catalyst.plans.logical.UnaryNode, but interface was expected
    at io.github.spark_redshift_community.spark.redshift.pushdown.querygeneration.UnaryOp$.unapply(UnaryOp.scala:29)
    at io.github.spark_redshift_community.spark.redshift.pushdown.querygeneration.QueryBuilder.generateQueries(QueryBuilder.scala:151)
    at io.github.spark_redshift_community.spark.redshift.pushdown.querygeneration.QueryBuilder.liftedTree1$1(QueryBuilder.scala:100)
    at io.github.spark_redshift_community.spark.redshift.pushdown.querygeneration.QueryBuilder.treeRoot$lzycompute(QueryBuilder.scala:98)
    at io.github.spark_redshift_community.spark.redshift.pushdown.querygeneration.QueryBuilder.treeRoot(QueryBuilder.scala:97)
    at io.github.spark_redshift_community.spark.redshift.pushdown.querygeneration.QueryBuilder.tryBuild$lzycompute(QueryBuilder.scala:56)
    at io.github.spark_redshift_community.spark.redshift.pushdown.querygeneration.QueryBuilder.tryBuild(QueryBuilder.scala:55)
    at io.github.spark_redshift_community.spark.redshift.pushdown.querygeneration.QueryBuilder$.getQueryFromPlan(QueryBuilder.scala:260)
    at io.github.spark_redshift_community.spark.redshift.pushdown.RedshiftStrategy.buildQueryRDD(RedshiftStrategy.scala:61)
    at io.github.spark_redshift_community.spark.redshift.pushdown.RedshiftStrategy.apply(RedshiftStrategy.scala:33)
    at org.apache.spark.sql.catalyst.planning.QueryPlanner.$anonfun$plan$1(QueryPlanner.scala:63)
    at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:482)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:488)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:487)
    at org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:93)
    at org.apache.spark.sql.execution.SparkStrategies.plan(SparkStrategies.scala:68)
    at org.apache.spark.sql.execution.QueryExecution$.createSparkPlan(QueryExecution.scala:330)
    at org.apache.spark.sql.execution.QueryExecution.$anonfun$sparkPlan$1(QueryExecution.scala:94)
    at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:111)
    at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:133)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:764)
    at org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:133)
    at org.apache.spark.sql.execution.QueryExecution.sparkPlan$lzycompute(QueryExecution.scala:94)
    at org.apache.spark.sql.execution.QueryExecution.sparkPlan(QueryExecution.scala:87)
    at org.apache.spark.sql.execution.QueryExecution.$anonfun$executedPlan$1(QueryExecution.scala:107)
    at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:111)
    at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:133)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:764)
    at org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:133)
    at org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:107)
    at org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:100)
    at org.apache.spark.sql.execution.QueryExecution.$anonfun$writePlans$5(QueryExecution.scala:199)
    at org.apache.spark.sql.catalyst.plans.QueryPlan$.append(QueryPlan.scala:381)
    at org.apache.spark.sql.execution.QueryExecution.org$apache$spark$sql$execution$QueryExecution$$writePlans(QueryExecution.scala:199)
    at org.apache.spark.sql.execution.QueryExecution.toString(QueryExecution.scala:207)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:95)
    at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:160)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:87)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:764)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
    at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3616)
    at org.apache.spark.sql.Dataset.head(Dataset.scala:2697)
    at org.apache.spark.sql.Dataset.take(Dataset.scala:2904)
    at org.apache.spark.sql.Dataset.getRows(Dataset.scala:300)
    at org.apache.spark.sql.Dataset.showString(Dataset.scala:337)
    at org.apache.spark.sql.Dataset.show(Dataset.scala:824)

The project I am working on uses Spark 3.0.1, is this a problem? Thanks in advance.

jsleight commented 1 year ago

Correct. spark-redshift v6.0.0 is compatible with either Spark v3.3 or Spark v3.4 so you'll need to upgrade Spark versions in order to use the cross-region capabilities. There are two different released v6.0.0 artifacts which are specific for the spark version you're using.

MarcBarreiro commented 1 year ago

I will try to upgrade the Spark version, thanks for your help!