itsumma / spark-greenplum-connector

ITSumma Spark Greenplum Connector
MIT License
34 stars 10 forks source link

Connector does not work in local Spark mode #15

Open maxim-lixakov opened 2 months ago

maxim-lixakov commented 2 months ago

Description

The Spark-Greenplum connector does not work correctly in local Spark mode (local-master). When performing read operations, there is a significant waiting time, and write operations crash with a timeout error. The problem is reproduced on Spark 3.x versions when writing data to Greenplum via spark-greenplum-connector_2.12-3.1.jar.

Steps to reproduce

To reproduce this issue you need:

  1. start GP container:

docker-compose.yml:

services:
  greenplum:
    image: andruche/greenplum:6
    restart: unless-stopped
    ports:
      - 5432:5432
    extra_hosts:
      - host.docker.internal:host-gateway
    sysctls:
      - net.ipv6.conf.all.disable_ipv6=1
docker compose up -d greenplum
  1. set up table with some test data

create database test

CREATE TABLE public.employee ( employee_id SERIAL PRIMARY KEY,
first_name VARCHAR(50),
last_name VARCHAR(50),
birth_date DATE,
salary NUMERIC(10, 2),
is_active BOOLEAN
) DISTRIBUTED BY (employee_id);

INSERT INTO public.employee (first_name, last_name, birth_date, salary, is_active) VALUES ('John', 'Doe', '1985-05-15', 55000.00, TRUE), ('Jane', 'Smith', '1990-10-25', 62000.00, TRUE), ('Mark', 'Johnson', '1978-03-12', 70000.00, FALSE), ('Lucy', 'Williams', '1983-07-19', 48000.00, TRUE);

SELECT * FROM employee;


3.  Build ``spark-greenplum-connector_2.12-3.1.jar`` from source, but it does not work in local mode,
    the problem is described in issue: https://github.com/itsumma/spark-greenplum-connector/issues/14, I have made
    in the source code the changes suggested in the issue:

```scala

  def guessMaxParallelTasks(): Int = {
    val sparkContext = SparkContext.getOrCreate
    var guess: Int = -1
    val osName = System.getProperty("os.name")
    var isLocal: Boolean = false
    if (osName.toLowerCase().contains("windows") || osName.toLowerCase().contains("mac")) {
      isLocal = true
    }
    if (isLocal) {
      guess = sparkContext.getConf.getInt("spark.default.parallelism", 1) - 1;
    } else {
      while ((guess <= 0) && !Thread.currentThread().isInterrupted) {
        guess = sparkContext.getExecutorMemoryStatus.keys.size - 1
        if (sparkContext.deployMode == "cluster")
          guess -= 1
      }
    }
    guess
  }
  1. launch a spark session, when creating a spark session specify spark.default.parallelism=2 to make the guess value non-zero:
spark-shell --jars /path/to/spark-greenplum-connector/target/spark-greenplum-connector_2.12-3.1.jar --conf spark.default.parallelism=2
  1. run the code and make sure the connector is not working properly
val gpdf = spark.read.format("its-greenplum").option("url", "jdbc:postgresql://localhost:5432/test").option("user", "gpadmin").option( "password", "").option("dbtable","employee").load()
spark.sparkContext.setLogLevel("DEBUG")

// takes an average of 2 minutes
gpdf.show()

// takes an average of 2 minutes
gpdf.count()

val newEmployeeData = Seq(
  (9, "Alice", "Green", "1992-09-12", 58000.00, true),
  (10, "Bob", "White", "1980-01-23", 64000.00, false)
).toDF("employee_id", "first_name", "last_name", "birth_date", "salary", "is_active")

// fails with error:
// `Caused by: java.lang.Exception: Timeout 60000 elapsed for func=checkIn, {"queryId": "5b0d274c-e585-4d1d-a698-de00ab2bcf25", "partId": "1",
//                                                                         "instanceId": "1:4:0", "nodeIp": "192.168.1.69", "dir": "W",
//                                                                          "executorId": "driver", "batchNo": null, "gpSegmentId": null,
//                                                                         "rowCount": 0, "status": "null", "gpfdistUrl": "null"}`
newEmployeeData.write.format("its-greenplum").option("url", "jdbc:postgresql://localhost:5432/test").option("user", "gpadmin").option("password", "").option("dbtable","employee").mode("append").save()

Logs

logs while trying to read data from table:

24/09/10 16:34:58 INFO RMISlave: 
coordinatorAsks: sqlTransferAbort, {"queryId": "a42adbf508224efa858b9a722482522d", "partId": "0",
                                    "instanceId": "0:0:2", "nodeIp": "10.195.113.139", "dir": "R",
                                    "executorId": "driver", "batchNo": 0, "gpSegmentId": 0,
                                    "rowCount": 0, "status": "i", "gpfdistUrl": "gpfdist://10.195.113.139:56701/output.pipe"}
24/09/10 16:34:58 DEBUG ProgressTracker: gpfCommitMs took 1038 ms
24/09/10 16:35:27 DEBUG RMIMaster: 
749 handlerAsks: checkIn, New gpfdist instance started on {"queryId": "6e031f3880a04c3ebfda01e1684f8a63", "partId": "0",
                                    "instanceId": "0:0:3", "nodeIp": "10.195.113.139", "dir": "R",
                                    "executorId": "driver", "batchNo": 0, "gpSegmentId": 0,
                                    "rowCount": 0, "status": "i", "gpfdistUrl": "gpfdist://10.195.113.139:56747/output.pipe"}
24/09/10 16:35:27 INFO RMIMaster: New batch: 0,
seg2ServiceProvider.keySet={0}
address2Seg={10.195.113.139 -> Set(0)}
pcbByInstanceId=
{0:0:3 -> {"queryId": "6e031f3880a04c3ebfda01e1684f8a63", "partId": "0",
                                    "instanceId": "0:0:3", "nodeIp": "10.195.113.139", "dir": "R",
                                    "executorId": "driver", "batchNo": 0, "gpSegmentId": 0,
                                    "rowCount": 0, "status": "i", "gpfdistUrl": "gpfdist://10.195.113.139:56747/output.pipe"}}
24/09/10 16:35:27 DEBUG RMIMaster: 
waitBatch success, localBatchNo=0, batchSize=1
24/09/10 16:35:27 INFO RMISlave: 
coordinatorAsks: sqlTransferComplete, {"queryId": "6e031f3880a04c3ebfda01e1684f8a63", "partId": "0",
                                    "instanceId": "0:0:3", "nodeIp": "10.195.113.139", "dir": "R",
                                    "executorId": "driver", "batchNo": 0, "gpSegmentId": 0,
                                    "rowCount": 0, "status": "i", "gpfdistUrl": "gpfdist://10.195.113.139:56747/output.pipe"}
24/09/10 16:35:27 DEBUG RMISlave: End of stream: threadId=132, instanceId=0:0:3
24/09/10 16:35:27 DEBUG GreenplumInputPartitionReader: gpfdist://10.195.113.139:56747/output.pipe epoch=0 end of stream, rowCount=4
24/09/10 16:35:27 WARN BufferExchange: doFlush: buff==null
24/09/10 16:35:27 INFO RMISlave: Calling commit on {"queryId": "6e031f3880a04c3ebfda01e1684f8a63", "partId": "0",
                                    "instanceId": "0:0:3", "nodeIp": "10.195.113.139", "dir": "R",
                                    "executorId": "driver", "batchNo": 0, "gpSegmentId": 0,
                                    "rowCount": 4, "status": "i", "gpfdistUrl": "gpfdist://10.195.113.139:56747/output.pipe"}
24/09/10 16:35:58 INFO RMIMaster: Batch 0: disconnected {"queryId": "a42adbf508224efa858b9a722482522d", "partId": "0",
                                    "instanceId": "0:0:2", "nodeIp": "10.195.113.139", "dir": "R",
                                    "executorId": "driver", "batchNo": 0, "gpSegmentId": 0,
                                    "rowCount": 1, "status": "a", "gpfdistUrl": "gpfdist://10.195.113.139:56701/output.pipe"}

as can be seen from the logs between the interaction of RMISlave and RMIMaster takes more than 30 seconds.

logs while trying to write data to table:

24/09/05 14:41:09 WARN TaskSetManager: Lost task 0.0 in stage 4.0 (TID 3) (192.168.1.69 executor driver): TaskKilled (Stage cancelled: Job aborted due to stage failure: Task 1 in stage 4.0 failed 1 times, most recent failure: Lost task 1.0 in stage 4.0 (TID 4) (192.168.1.69 executor driver): java.rmi.UnexpectedException: unexpected exception; nested exception is: 
    java.lang.Exception: Timeout 60000 elapsed for func=checkIn, {"queryId": "5b0d274c-e585-4d1d-a698-de00ab2bcf25", "partId": "1",
                                    "instanceId": "1:4:0", "nodeIp": "192.168.1.69", "dir": "W",
                                    "executorId": "driver", "batchNo": null, "gpSegmentId": null,
                                    "rowCount": 0, "status": "null", "gpfdistUrl": "null"}
    at java.rmi.server.RemoteObjectInvocationHandler.invokeRemoteMethod(RemoteObjectInvocationHandler.java:253)
    at java.rmi.server.RemoteObjectInvocationHandler.invoke(RemoteObjectInvocationHandler.java:180)
    at com.sun.proxy.$Proxy29.handlerAsks(Unknown Source)
    at com.itsumma.gpconnector.rmi.RMISlave.<init>(RMISlave.scala:204)
    at com.itsumma.gpconnector.writer.GreenplumDataWriter.init(GreenplumDataWriter.scala:50)
    at com.itsumma.gpconnector.writer.GreenplumDataWriter.write(GreenplumDataWriter.scala:74)
    at com.itsumma.gpconnector.writer.GreenplumDataWriter.write(GreenplumDataWriter.scala:27)
    at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.write(WriteToDataSourceV2Exec.scala:493)
    at org.apache.spark.sql.execution.datasources.v2.WritingSparkTask.$anonfun$run$1(WriteToDataSourceV2Exec.scala:448)
    at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1397)
    at org.apache.spark.sql.execution.datasources.v2.WritingSparkTask.run(WriteToDataSourceV2Exec.scala:486)
    at org.apache.spark.sql.execution.datasources.v2.WritingSparkTask.run$(WriteToDataSourceV2Exec.scala:425)
    at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.run(WriteToDataSourceV2Exec.scala:491)
    at org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.$anonfun$writeWithV2$2(WriteToDataSourceV2Exec.scala:388)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93)
    at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:166)
    at org.apache.spark.scheduler.Task.run(Task.scala:141)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
    at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
    at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.Exception: Timeout 60000 elapsed for func=checkIn, {"queryId": "5b0d274c-e585-4d1d-a698-de00ab2bcf25", "partId": "1",
                                    "instanceId": "1:4:0", "nodeIp": "192.168.1.69", "dir": "W",
                                    "executorId": "driver", "batchNo": null, "gpSegmentId": null,
                                    "rowCount": 0, "status": "null", "gpfdistUrl": "null"}
    at com.itsumma.gpconnector.rmi.RMIMaster.handlerAsks(RMIMaster.scala:383)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at sun.rmi.server.UnicastServerRef.dispatch(UnicastServerRef.java:357)
    at sun.rmi.transport.Transport$1.run(Transport.java:200)
    at sun.rmi.transport.Transport$1.run(Transport.java:197)
    at java.security.AccessController.doPrivileged(Native Method)
    at sun.rmi.transport.Transport.serviceCall(Transport.java:196)
    at sun.rmi.transport.tcp.TCPTransport.handleMessages(TCPTransport.java:573)
    at sun.rmi.transport.tcp.TCPTransport$ConnectionHandler.run0(TCPTransport.java:834)
    at sun.rmi.transport.tcp.TCPTransport$ConnectionHandler.lambda$run$0(TCPTransport.java:688)
    at java.security.AccessController.doPrivileged(Native Method)
    at sun.rmi.transport.tcp.TCPTransport$ConnectionHandler.run(TCPTransport.java:687)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
    at sun.rmi.transport.StreamRemoteCall.exceptionReceivedFromServer(StreamRemoteCall.java:303)
    at sun.rmi.transport.StreamRemoteCall.executeCall(StreamRemoteCall.java:279)
    at sun.rmi.server.UnicastRef.invoke(UnicastRef.java:164)
    at java.rmi.server.RemoteObjectInvocationHandler.invokeRemoteMethod(RemoteObjectInvocationHandler.java:235)
    ... 24 more

Driver stacktrace:)

Environment

Connector version: spark-greenplum-connector_2.12-3.1.jar Java version, Scala version: Java 1.8.0, Scala 2.12 OS: macOS 13.4.1 (22F82)

hovercraft-github commented 2 months ago

Hello! The reason probably is that your Greenplum instance, running inside a docker container, is not able to reach the gpfdist server which the connector start in the context of your Spark session.
For example, as it could be seen from the log, in this moment the gpfdist server is listening on the following address: gpfdist://10.195.113.139:56701/output.pipe
, where port 56701 is a dynamic random port that change on every operation. So you can try adjust your network routing rules somehow to let containerized application reach arbitrary TCP port on the 10.195.113.139 address.
However, we don't recommend running Greenplum or Spark in a container, because we didn't tested such a scenario and believe it has no much sense.
Also there could be yet another network related problem in your setup:
the second log reveals address 192.168.1.69: how is it related to 10.195.113.139 ? Do you have several network cards? Or network configuration changed between passes ?

dolfinus commented 2 months ago

The reason probably is that your Greenplum instance, running inside a docker container, is not able to reach the gpfdist server which the connector start in the context of your Spark session.

Reading data from Greenplum container to Spark executor is working. But for some reason it takes a minute to read a table with just 4 rows. Also writing from Spark executor to the same Greenplum container is failing with timeout. Having network access INSERT INTO WRITABLE EXTERNAL TABLE -> Spark executor gpfdist server but not having network access SELECT FROM REABABLE EXTERNAL TABLE -> Spark executor gpfdist server does not sound plausible for me.

hovercraft-github commented 2 months ago

Oh, I see, you are right - reading "somehow" works, and writing doesn't at all.
By the way, I doubt that applying the guessMaxParallelTasks patch you mentioned is a good idea.
The purpose of guessMaxParallelTasks is to find the number of executor instances and it doesn't necessary correlate at all with number of partitions in the DataFrame (spark.default.parallelism).
I will try to reproduce your case, but it can take some time. For a while I'd play with number of threads as in this post