exasol / spark-connector

A connector for Apache Spark to access Exasol
Apache License 2.0
12 stars 7 forks source link

Read race condition with several RDD operations in one job #200

Closed Shmuma closed 1 year ago

Shmuma commented 1 year ago

In case of several datanodes in the DB installation, reading from two tables end up in "connection refused" error

23/08/28 08:15:36 INFO ExasolConnectionManager: Making a connection using url = jdbc:exa-worker:10.0.0.12:20513;workerID=1;workertoken=5648301064778968824;validateservercertificate=0
23/08/28 08:15:36 INFO ExasolConnectionManager: Cache miss, create new connection
23/08/28 08:15:36 ERROR Executor: Exception in task 1.0 in stage 2.0 (TID 5)
com.exasol.jdbc.ConnectFailed: java.net.ConnectException: Connection refused
        at com.exasol.jdbc.AbstractEXAConnection.connectAndLogin(AbstractEXAConnection.java:2373)
        at com.exasol.jdbc.AbstractEXAConnection.setupConnection(AbstractEXAConnection.java:1646)
        at com.exasol.jdbc.AbstractEXAConnection.Connect(AbstractEXAConnection.java:1512)
        at com.exasol.jdbc.AbstractEXAConnection.<init>(AbstractEXAConnection.java:550)
        at com.exasol.jdbc.EXAConnection.<init>(EXAConnection.java:37)
        at com.exasol.jdbc.EXADriver.connect(EXADriver.java:225)

To reproduce, database with more than one datanode is needed. Table TEST.TEST has some rows:

sc.setLogLevel("INFO")

val df1 = spark
      .read
      .format("exasol")
      .option("host", "10.0.0.12")
      .option("port", "8563")
      .option("username", "sys")
      .option("password", "PASS")
      .option("query", "select * from TEST.TEST")
          .option("jdbc_options", "validateservercertificate=0")
          .load()

val df2 = spark
      .read
      .format("exasol")
      .option("host", "10.0.0.12")
      .option("port", "8563")
      .option("username", "sys")
      .option("password", "PASS")
      .option("query", "select * from TEST.TEST")
          .option("jdbc_options", "validateservercertificate=0")
          .load()

val df_u = df1.union(df2)
df_u.collect()
Shmuma commented 1 year ago

Root cause of this issue is concurrent creation of subconnections from the cached main connection (lines Cache hit, reuse existing below):

23/08/28 08:15:36 INFO BlockManagerInfo: Removed broadcast_1_piece0 on 30c3b58ca6ef:34223 in memory (size: 7.5 KiB, free: 434.4 MiB)
23/08/28 08:15:36 INFO BlockManagerInfo: Removed broadcast_0_piece0 on 30c3b58ca6ef:34223 in memory (size: 7.5 KiB, free: 434.4 MiB)
23/08/28 08:15:36 INFO ExasolRelation: Creating Spark RDD from Exasol query 'SELECT "COL1", "COL2" FROM (select * from TEST.TEST)'.
23/08/28 08:15:36 INFO ExasolRelation: Creating Spark RDD from Exasol query 'SELECT "COL1", "COL2" FROM (select * from TEST.TEST)'.
23/08/28 08:15:36 INFO CodeGenerator: Code generated in 25.363054 ms
23/08/28 08:15:36 INFO ExasolConnectionManager: Making a connection using url = jdbc:exa:10.0.0.12:8563;validateservercertificate=0
23/08/28 08:15:36 INFO ExasolConnectionManager: Cache hit, reusing existing
23/08/28 08:15:36 INFO ExasolRDD: Initiated 2 parallel exasol (sub) connections
23/08/28 08:15:36 INFO ExasolRDD: Executing enriched query 'SELECT "COL1", "COL2" FROM (select * from TEST.TEST)'.
23/08/28 08:15:36 INFO ExasolRDD: The number of partitions is 2
23/08/28 08:15:36 INFO ExasolConnectionManager: Making a connection using url = jdbc:exa:10.0.0.12:8563;validateservercertificate=0
23/08/28 08:15:36 INFO ExasolConnectionManager: Cache hit, reusing existing
23/08/28 08:15:36 INFO ExasolRDD: Initiated 2 parallel exasol (sub) connections
23/08/28 08:15:36 INFO ExasolRDD: Executing enriched query 'SELECT "COL1", "COL2" FROM (select * from TEST.TEST)'.
23/08/28 08:15:36 INFO ExasolRDD: The number of partitions is 2
23/08/28 08:15:36 INFO SparkContext: Starting job: collect at <console>:1