exasol / spark-connector

A connector for Apache Spark to access Exasol
Apache License 2.0
12 stars 7 forks source link

Committing transaction in the `finally` handler #197

Closed Shmuma closed 1 year ago

Shmuma commented 1 year ago

In ExasolWriter's partition handler we are doing very weird thing:

    } finally {
      stmt.close()
      subConn.commit()
      subConn.close()
    }

https://github.com/exasol/spark-connector/blob/main/exasol-jdbc/src/main/scala/com/exasol/spark/ExasolWriter.scala#L117

There are several major issues in this code:

Shmuma commented 1 year ago

Tried to reproduce the early close issue by inserting uneven amount of rows in partitions, but without much luck. Looks like Exasol doesn't fail other connections if one is closed (maybe some undefined behaviour).

Code I tried:

sc.setLogLevel("INFO")

case class Entry(col1: String, col2: Int)

val rdd = sc.parallelize(List(1, 1000000), 2)

def explode(f: Iterator[Int]): Iterator[Entry] = {
    val cnt = f.next()
    for (i <- (1 to cnt).iterator) yield Entry(s"a-$cnt", i)
}

val df = rdd.mapPartitions(explode).toDF()

df.write
        .mode("append")
        .format("exasol")
        .option("host", "10.0.0.12")
        .option("port", "8563")
        .option("username", "sys")
        .option("password", "PASS")
        .option("jdbc_options", "validateservercertificate=0")
        .option("table", "TEST.TEST")
        .save()

Log

1: started Thu Aug 31 07:39:43 UTC 2023
0: started Thu Aug 31 07:39:43 UTC 2023
0: Written 1 rows: Thu Aug 31 07:39:43 UTC 2023
1: Written 1000000 rows: Thu Aug 31 07:39:46 UTC 2023
Shmuma commented 1 year ago

Was able to reproduce the issue with --master local[1] parameter to the spark-shell. In that case, first subconnection is closed before the open of the second subconnection, which casuses connection refused.

23/08/31 09:30:29 INFO CodeGenerator: Code generated in 63.357139 ms
23/08/31 09:32:33 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0)/ 2]
com.exasol.jdbc.ConnectFailed: Connection was lost and could not be reestablished.  (SessionID: 1775736432917938177)
        at com.exasol.jdbc.AbstractEXAConnection.reconnect(AbstractEXAConnection.java:4233)
        at com.exasol.jdbc.ServerCommunication.handle(ServerCommunication.java:294)
        at com.exasol.jdbc.AbstractEXAConnection.communication(AbstractEXAConnection.java:3139)

In large systems this might happen if we have large DB and relatively small spark cluster.