GoogleCloudDataproc / spark-bigtable-connector

Apache License 2.0
2 stars 4 forks source link

Cannot write dataframe to cloudBigTable using PySpark Connector - :key not found: BigtableClientKey #32

Closed rcardial87 closed 2 months ago

rcardial87 commented 3 months ago

Hi

I followed the spark connector document https://github.com/GoogleCloudDataproc/spark-bigtable-connector

and getting error while writing to bigtable using spark.

spark.version = 3.5.0 Scala code runner version 2.12.18 bigtable_spark_connector_jar="gs://spark-lib/bigtable/spark-bigtable_2.12-0.1.0.jar" The script is creating the table but without load any data. erros trace: Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2844) at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2780) at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2779) at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62) at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2779) at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1242) at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1242) at scala.Option.foreach(Option.scala:407) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1242) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:3048) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2982) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2971) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49) at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:984) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2415) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2436) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2455) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2480) at org.apache.spark.rdd.RDD.$anonfun$foreachPartition$1(RDD.scala:1036) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) at org.apache.spark.rdd.RDD.withScope(RDD.scala:407) at org.apache.spark.rdd.RDD.foreachPartition(RDD.scala:1034) at com.google.cloud.spark.bigtable.BigtableRelation.insert(BigtableDefaultSource.scala:162) at com.google.cloud.spark.bigtable.BigtableDefaultSource.createRelation(BigtableDefaultSource.scala:76) at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:48) at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:75) at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:73) at org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:84) at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:107) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:125) at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:201) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:108) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900) at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:66) at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:107) at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:98) at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:473) at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(origin.scala:76) at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:473) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:32) at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267) at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:32) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:32) at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:449) at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:98) at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:85) at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:83) at org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:142) at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:859) at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:388) at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:361) at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:248) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.base/java.lang.reflect.Method.invoke(Method.java:566) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374) at py4j.Gateway.invoke(Gateway.java:282) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182) at py4j.ClientServerConnection.run(ClientServerConnection.java:106) at java.base/java.lang.Thread.run(Thread.java:829) Caused by: java.util.NoSuchElementException: key not found: BigtableClientKey( projectId = , instanceId = , appProfileId = default, emulatorPort = None, batchMutateFlowControl = false, readRowsAttemptTimeout = None, readRowsTotalTimeout = None, mutateRowsAttemptTimeout = None, mutateRowsTotalTimeout = None, batchSize = 100, userAgentText = spark-bigtable_2.12/0.1.0 spark/3.5.0 data source/V1 scala/2.12.18 ) at scala.collection.MapLike.default(MapLike.scala:236) at scala.collection.MapLike.default$(MapLike.scala:235) at scala.collection.AbstractMap.default(Map.scala:65) at scala.collection.mutable.HashMap.apply(HashMap.scala:69) at com.google.cloud.spark.bigtable.datasources.BigtableDataClientBuilder$.getHandle(BigtableDataClientBuilder.scala:72) at com.google.cloud.spark.bigtable.BigtableRelation.$anonfun$insert$2(BigtableDefaultSource.scala:164) at com.google.cloud.spark.bigtable.BigtableRelation.$anonfun$insert$2$adapted(BigtableDefaultSource.scala:162) at org.apache.spark.rdd.RDD.$anonfun$foreachPartition$2(RDD.scala:1036) at org.apache.spark.rdd.RDD.$anonfun$foreachPartition$2$adapted(RDD.scala:1036) at org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2455) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93) at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161) at org.apache.spark.scheduler.Task.run(Task.scala:141) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620) at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64) at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:95) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)

rkaregar commented 3 months ago

Hi, are you including a logging framework, e.g., reload4j when submitting the job? You can use the --packages=org.slf4j:slf4j-reload4j:1.7.36 when submitting jobs locally, with more information here:

https://github.com/GoogleCloudDataproc/spark-bigtable-connector/blob/main/examples/python/README.md

If this doesn't work, can you please share more details about the environment and the command you are using for submitting the job?

rcardial87 commented 2 months ago

Hi i am not used reload4j , i will try. About the environment the process run in a dataproc cluster, witth debeian image 2.2

rcardial87 commented 2 months ago

After include the package -packages=org.slf4j:slf4j-reload4j:1.7.36, the process works fine.Thanks!!

rkaregar commented 2 months ago

Glad to hear the problem is resolved, please feel free to reach out to us if you run into any other issues!