exasol / spark-connector

A connector for Apache Spark to access Exasol
Apache License 2.0
12 stars 7 forks source link

Several subconnections created with read+write RDD operations #195

Open Shmuma opened 1 year ago

Shmuma commented 1 year ago

During investigation of https://github.com/exasol/integration-tasks/issues/224 found the following corner-case:

if we read from exasol and then write the data back to the same database, spark-connector creates several subconnections from the same worker, which makes everything to fail.

To reproduce:

  1. create a table with single row
  2. DB has to have 2 datanodes

Read the data (no caching of RDD)

sc.setLogLevel("INFO")

val exasolDF = spark
      .read
      .format("exasol")
      .option("host", "10.0.0.12")
      .option("port", "8563")
      .option("username", "sys")
      .option("password", "PASS")
      .option("query", "select * from TEST.TEST")
      .option("jdbc_options", "validateservercertificate=0")
     .load()

Write it back

exasolDF
        .write
        .mode("append")
        .format("exasol")
        .option("host", "10.0.0.12")
        .option("port", "8563")
        .option("username", "sys")
        .option("password", "PASS")
        .option("jdbc_options", "validateservercertificate=0")
        .option("table", "TEST.TEST")
        .save()

This makes it fail with Connection refused error

23/08/25 08:39:07 INFO ExasolRDD: Sub connection with url = jdbc:exa-worker:10.0.0.11:20799;workerID=0;workertoken=1600439801397059506;validateservercertificate=0 and handle = 1
23/08/25 08:39:07 INFO ExasolRDD: Sub connection with url = jdbc:exa-worker:10.0.0.12:20047;workerID=1;workertoken=1600439801397059506;validateservercertificate=0 and handle = 1
23/08/25 08:39:07 INFO ExasolConnectionManager: Making a connection using url = jdbc:exa-worker:10.0.0.12:20047;workerID=1;workertoken=1600439801397059506;validateservercertificate=0
23/08/25 08:39:07 INFO ExasolConnectionManager: Cache miss, create new connection
23/08/25 08:39:07 INFO ExasolConnectionManager: Making a connection using url = jdbc:exa-worker:10.0.0.11:20799;workerID=0;workertoken=1600439801397059506;validateservercertificate=0
23/08/25 08:39:07 INFO ExasolConnectionManager: Cache miss, create new connection
23/08/25 08:39:07 INFO CodeGenerator: Code generated in 52.345884 ms(0 + 2) / 2]
23/08/25 08:39:07 INFO ExasolConnectionManager: Making a connection using url = jdbc:exa-worker:10.0.0.12:20641;workerID=1;workertoken=8082145199737655285;validateservercertificate=0
23/08/25 08:39:07 INFO ExasolConnectionManager: Cache miss, create new connection
23/08/25 08:41:11 INFO ExasolConnectionManager: Making a connection using url = jdbc:exa-worker:10.0.0.11:20241;workerID=0;workertoken=8082145199737655285;validateservercertificate=0
23/08/25 08:41:11 INFO ExasolConnectionManager: Cache miss, create new connection
23/08/25 08:41:11 INFO ExasolRDD: Closed a sub connection
23/08/25 08:41:11 INFO ExasolRDD: Closed a sub connection
23/08/25 08:41:11 ERROR Executor: Exception in task 1.0 in stage 0.0 (TID 1)
com.exasol.jdbc.ConnectFailed: Connection was lost and could not be reestablished.  (SessionID: 1775189618921242625)
        at com.exasol.jdbc.AbstractEXAConnection.reconnect(AbstractEXAConnection.java:4233)
        at com.exasol.jdbc.ServerCommunication.handle(ServerCommunication.java:294)
        at com.exasol.jdbc.AbstractEXAConnection.communication(AbstractEXAConnection.java:3139)
        at com.exasol.jdbc.AbstractEXAConnection.communication_resultset(AbstractEXAConnection.java:2844)
        at com.exasol.jdbc.AbstractEXAPreparedStatement.<init>(AbstractEXAPreparedStatement.java:61)
        at com.exasol.jdbc.EXAPreparedStatement.<init>(EXAPreparedStatement.java:14)
        at com.exasol.jdbc.DialectGeneric.createPreparedStatement(DialectGeneric.java:10)
        at com.exasol.jdbc.AbstractEXAConnection.prepareStatement(AbstractEXAConnection.java:1099)
        at com.exasol.spark.writer.ExasolWriter.insertPartition(ExasolWriter.scala:79)
        at com.exasol.spark.DefaultSource.$anonfun$saveDataFrame$1(DefaultSource.scala:161)
        at com.exasol.spark.DefaultSource.$anonfun$saveDataFrame$1$adapted(DefaultSource.scala:161)
        at org.apache.spark.rdd.RDD.$anonfun$foreachPartition$2(RDD.scala:1009)

This happens only when there is a single row in the underlying table, so my assumption is that connection is closed too early in the empty partition