vertica / spark-connector

This component acts as a bridge between Spark and Vertica, allowing the user to either retrieve data from Vertica for processing in Spark, or store processed data from Spark into Vertica.
Apache License 2.0
20 stars 23 forks source link

unable to access staging_fs_url using org.apache.hadoop.fs.s3a.auth.IAMInstanceCredentialsProvider in aws_credentials_provider option #552

Open akshay7023 opened 1 year ago

akshay7023 commented 1 year ago

Environment


Problem Description

Following options are used to read from vertica table: host, user, password, staging_fs_url, db, dbschema, table, aws_credentials_provider


Spark Connector Logs

Caused by: java.sql.SQLSyntaxErrorException: [Vertica]VJDBC ERROR: Permission denied for storage location [s3a://##/##/##] Stack trace: com.vertica.util.ServerErrorData.buildException(Unknown Source) com.vertica.dataengine.VResultSet.fetchChunk(Unknown Source) com.vertica.dataengine.VResultSet.initialize(Unknown Source) com.vertica.dataengine.VQueryExecutor.readExecuteResponse(Unknown Source) com.vertica.dataengine.VQueryExecutor.handleExecuteResponse(Unknown Source) com.vertica.dataengine.VQueryExecutor.execute(Unknown Source) com.vertica.jdbc.common.SStatement.executeNoParams(SStatement.java:3349) com.vertica.jdbc.common.SStatement.execute(SStatement.java:753) com.vertica.spark.datasource.jdbc.VerticaJdbcLayer.$anonfun$execute$1(VerticaJdbcLayer.scala:315) scala.util.Try$.apply(Try.scala:213) com.vertica.spark.datasource.jdbc.VerticaJdbcLayer.execute(VerticaJdbcLayer.scala:303) com.vertica.spark.datasource.core.VerticaDistributedFilesystemReadPipe.$anonfun$doPreReadSteps$9(VerticaDistributedFilesystemReadPipe.scala:293) scala.util.Either.flatMap(Either.scala:341) com.vertica.spark.datasource.core.VerticaDistributedFilesystemReadPipe.$anonfun$doPreReadSteps$8(VerticaDistributedFilesystemReadPipe.scala:282) com.vertica.spark.datasource.core.VerticaDistributedFilesystemReadPipe.$anonfun$doPreReadSteps$8$adapted(VerticaDistributedFilesystemReadPipe.scala:280) scala.util.Either.flatMap(Either.scala:341) com.vertica.spark.datasource.core.VerticaDistributedFilesystemReadPipe.$anonfun$doPreReadSteps$7(VerticaDistributedFilesystemReadPipe.scala:280) scala.util.Either.flatMap(Either.scala:341) com.vertica.spark.datasource.core.VerticaDistributedFilesystemReadPipe.$anonfun$doPreReadSteps$6(VerticaDistributedFilesystemReadPipe.scala:268) scala.util.Either.flatMap(Either.scala:341) com.vertica.spark.datasource.core.VerticaDistributedFilesystemReadPipe.$anonfun$doPreReadSteps$4(VerticaDistributedFilesystemReadPipe.scala:261) scala.util.Either.flatMap(Either.scala:341) com.vertica.spark.datasource.core.VerticaDistributedFilesystemReadPipe.$anonfun$doPreReadSteps$3(VerticaDistributedFilesystemReadPipe.scala:259) scala.util.Either.flatMap(Either.scala:341) com.vertica.spark.datasource.core.VerticaDistributedFilesystemReadPipe.$anonfun$doPreReadSteps$2(VerticaDistributedFilesystemReadPipe.scala:257) scala.util.Either.flatMap(Either.scala:341) com.vertica.spark.datasource.core.VerticaDistributedFilesystemReadPipe.exportData$1(VerticaDistributedFilesystemReadPipe.scala:254) com.vertica.spark.datasource.core.VerticaDistributedFilesystemReadPipe.doPreReadSteps(VerticaDistributedFilesystemReadPipe.scala:345) com.vertica.spark.datasource.core.DSReadConfigSetup.performInitialSetup(DSConfigSetup.scala:697) com.vertica.spark.datasource.core.DSReadConfigSetup.performInitialSetup(DSConfigSetup.scala:650) com.vertica.spark.datasource.v2.VerticaScan.planInputPartitions(VerticaDatasourceV2Read.scala:228) org.apache.spark.sql.execution.datasources.v2.BatchScanExec.inputPartitions$lzycompute(BatchScanExec.scala:54) org.apache.spark.sql.execution.datasources.v2.BatchScanExec.inputPartitions(BatchScanExec.scala:54) org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanExecBase.supportsColumnar(DataSourceV2ScanExecBase.scala:142) org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanExecBase.supportsColumnar$(DataSourceV2ScanExecBase.scala:141) org.apache.spark.sql.execution.datasources.v2.BatchScanExec.supportsColumnar(BatchScanExec.scala:36) org.apache.spark.sql.execution.datasources.v2.DataSourceV2Strategy.apply(DataSourceV2Strategy.scala:143) org.apache.spark.sql.catalyst.planning.QueryPlanner.$anonfun$plan$1(QueryPlanner.scala:63) scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:486) scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:492) scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:491) org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:93) org.apache.spark.sql.execution.SparkStrategies.plan(SparkStrategies.scala:72) org.apache.spark.sql.catalyst.planning.QueryPlanner.$anonfun$plan$3(QueryPlanner.scala:78) scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:196) scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:194) scala.collection.Iterator.foreach(Iterator.scala:943) scala.collection.Iterator.foreach$(Iterator.scala:943) scala.collection.AbstractIterator.foreach(Iterator.scala:1431) scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:199) scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:192) scala.collection.AbstractIterator.foldLeft(Iterator.scala:1431) org.apache.spark.sql.catalyst.planning.QueryPlanner.$anonfun$plan$2(QueryPlanner.scala:75) scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:486) scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:492) org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:93) org.apache.spark.sql.execution.SparkStrategies.plan(SparkStrategies.scala:72) org.apache.spark.sql.execution.QueryExecution$.createSparkPlan(QueryExecution.scala:495) org.apache.spark.sql.execution.QueryExecution.$anonfun$sparkPlan$1(QueryExecution.scala:153) org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:192) org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$2(QueryExecution.scala:213) org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:552) org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:213) org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779) org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:212) org.apache.spark.sql.execution.QueryExecution.sparkPlan$lzycompute(QueryExecution.scala:153) org.apache.spark.sql.execution.QueryExecution.sparkPlan(QueryExecution.scala:146) org.apache.spark.sql.execution.QueryExecution.$anonfun$executedPlan$1(QueryExecution.scala:166) org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:192) org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$2(QueryExecution.scala:213) org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:552) org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:213) org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779) org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:212) org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:163) org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:159) org.apache.spark.sql.execution.QueryExecution.$anonfun$writePlans$5(QueryExecution.scala:298) org.apache.spark.sql.catalyst.plans.QueryPlan$.append(QueryPlan.scala:657) org.apache.spark.sql.execution.QueryExecution.writePlans(QueryExecution.scala:298) org.apache.spark.sql.execution.QueryExecution.toString(QueryExecution.scala:313) org.apache.spark.sql.execution.QueryExecution.org$apache$spark$sql$execution$QueryExecution$$explainString(QueryExecution.scala:267) org.apache.spark.sql.execution.QueryExecution.explainString(QueryExecution.scala:246) org.apache.spark.sql.execution.SQLExecution$.executeQuery$1(SQLExecution.scala:107) org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$7(SQLExecution.scala:139) org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107) org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:224) org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:139) org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:245) org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId

romanperesypkin commented 11 months ago

Hi Got same issue today on Vertica 10.0.1 , connector 3.3.5 and spark(mapr) 3.3.2.

Seems like vertica connector should swap "s3a" with "s3" for COPY operation.

Did tests on vertica via dbeaver COPY test.user FROM 's3://stage/tmp/*.parquet' PARQUET -> works COPY test.user FROM 's3a://stage/tmp/*.parquet' PARQUET -> permission denied

Connector log shows COPY operation uses s3a

23/10/17 21:56:46 INFO VerticaDistributedFilesystemWritePipe: The copy statement is:
COPY "test"."user" ("language","users_count") FROM 's3a://stage/b2d0a0df_b22e_4e35_82b4_a58f44a1e0ba/*.parquet' ON ANY NODE parquet REJECTED DATA AS TABLE "ttt_b2d0a0df_b22e_4e35_82b4_a58f44a1e0ba_COMMITS" NO COMMIT

Will try to build own version and test it

romanperesypkin commented 11 months ago

My test works fine. Seems like Vertica COPY operation does not understand "s3a".

All I did for the test was: var newUrl = url.replace("s3a", "s3") and set it instead of $url.

Any help from devs will be great as I do not know the details.