vertica / spark-connector

This component acts as a bridge between Spark and Vertica, allowing the user to either retrieve data from Vertica for processing in Spark, or store processed data from Spark into Vertica.
Apache License 2.0
20 stars 22 forks source link

[BUG] Could not get external table schema: String index out of range: -1 #475

Closed KevinAppelBofa closed 2 years ago

KevinAppelBofa commented 2 years ago

Environment


Problem Description

Trying to create an external table from existing data is having an error

Caused by: com.vertica.spark.util.error.ConnectorException: Could not get external table schema: String index out of range: -1 at com.vertica.spark.util.error.ErrorHandling$.logAndThrowError(ErrorHandling.scala:78) at com.vertica.spark.datasource.v2.VerticaBatchWrite.commit(VerticaDatasourceV2Write.scala:92) at org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.writeWithV2(WriteToDataSourceV2Exec.scala:369) ... 44 more Suppressed: com.vertica.spark.util.error.ConnectorException: Writing job aborted. Check spark worker log for specific error. at com.vertica.spark.util.error.ErrorHandling$.logAndThrowError(ErrorHandling.scala:78) at com.vertica.spark.datasource.v2.VerticaBatchWrite.abort(VerticaDatasourceV2Write.scala:104) at org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.writeWithV2(WriteToDataSourceV2Exec.scala:376) ... 44 more

  1. Steps to reproduce: Create empty schema df, write df using create_external_table="existing-data" and specify path to existing data on staging_fs_url option
  2. Expected behaviour: Create the external table
  3. Actual behaviour: Error due to the can't get external table schema
  4. Error message/stack trace: See above
  5. Code sample or example on how to reproduce the issue: Just trying to do the basic write from the example; using another dataset is working fine, just this specific dataset is causing this Could not get external table schema: String index out of range: -1 mesage
  6. If I go into vsql and run the select_infer_schema command and copy/paste the DDL, then it loads fine

Spark Connector Logs

Aryex commented 2 years ago

Thanks, Kevin. We'll take a look at this.

Aryex commented 2 years ago

Hi @KevinAppelBofa

Could you show us the full error log? This error should be happening on the driver node so the logs should be located there.

Also, what were the connector options used and their values? Do you know the schema of the dataset?

KevinAppelBofa commented 2 years ago

This is the output not seeing what the actual failure is

22/08/08 15:52:48 ERROR v2.VerticaBatchReader: Exception while retrieving external table schema.

Caused by: java.lang.StringIndexOutOfBoundsException: String index out of range: -1 Stack trace: java.lang.String.substring(String.java:1960) com.vertica.spark.util.schema.SchemaTools.$anonfun$inferExternalTableSchema$1(SchemaTools.scala:657) scala.collection.immutable.List.map(List.scala:297) com.vertica.spark.util.schema.SchemaTools.inferExternalTableSchema(SchemaTools.scala:654) com.vertica.spark.datasource.core.VerticaDistributedFilesystemWritePipe.inferExternalTableSchema(VerticaDistributedFilesystemWritePipe.scala:293) com.vertica.spark.datasource.core.VerticaDistributedFilesystemWritePipe.$anonfun$commitDataAsExternalTable$2(VerticaDistributedFilesystemWritePipe.scala:517) scala.util.Either.flatMap(Either.scala:341) com.vertica.spark.datasource.core.VerticaDistributedFilesystemWritePipe.commitDataAsExternalTable(VerticaDistributedFilesystemWritePipe.scala:507) com.vertica.spark.datasource.core.VerticaDistributedFilesystemWritePipe.commit(VerticaDistributedFilesystemWritePipe.scala:555) com.vertica.spark.datasource.core.DSWriter.commitRows(DSWriter.scala:102) com.vertica.spark.datasource.v2.VerticaBatchWrite.commit(VerticaDatasourceV2Write.scala:91) org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.writeWithV2(WriteToDataSourceV2Exec.scala:369) org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.writeWithV2$(WriteToDataSourceV2Exec.scala:330) org.apache.spark.sql.execution.datasources.v2.OverwriteByExpressionExec.writeWithV2(WriteToDataSourceV2Exec.scala:254) org.apache.spark.sql.execution.datasources.v2.V2ExistingTableWriteExec.run(WriteToDataSourceV2Exec.scala:309) org.apache.spark.sql.execution.datasources.v2.V2ExistingTableWriteExec.run$(WriteToDataSourceV2Exec.scala:308) org.apache.spark.sql.execution.datasources.v2.OverwriteByExpressionExec.run(WriteToDataSourceV2Exec.scala:254) org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result$lzycompute(V2CommandExec.scala:43) org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result(V2CommandExec.scala:43) org.apache.spark.sql.execution.datasources.v2.V2CommandExec.executeCollect(V2CommandExec.scala:49) org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:110) org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103) org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163) org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90) org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775) org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64) org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:110) org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:106) org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:481) org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:82) org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:481) org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:30) org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267) org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263) org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30) org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30) org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:457) org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:106) org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:93) org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:91) org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:128) org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:848) org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:318) org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:247) sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) java.lang.reflect.Method.invoke(Method.java:498) py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) py4j.Gateway.invoke(Gateway.java:282) py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) py4j.commands.CallCommand.execute(CallCommand.java:79) py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182) py4j.ClientServerConnection.run(ClientServerConnection.java:106) java.lang.Thread.run(Thread.java:750)

df = spark.createDataFrame([], StructType([])) df.write.mode('overwrite').save(format="com.vertica.spark.datasource.VerticaSource", host=host, user=user, password=password, db=db, dbschema="myschema", staging_fs_url="hdfs:///location_to_the_parquet/mydataset.parquet", create_external_table="existing-data", table="ktest1")

The schema is having a mix of string, int and decimal; 56 columns 254MM rows, the underlying data is not partitionBy

Aryex commented 2 years ago

@KevinAppelBofa We just opened a PR addressing this issue. Once merged it'll be included in a patch release. Let me know if you have any questions.