GoEddie / spark-connect-dotnet

MIT License
14 stars 3 forks source link

Long running requests keep being resubmitted #41

Closed tdb108 closed 4 days ago

tdb108 commented 5 days ago

Thanks for all your work on this project, it's really great!

Been having some trouble with long running grpc requests and the new RequestExecutor added in this commit I have a particular write request that takes a few mins to complete.

If understand the logic correctly you are cancelling requests after 1 min and then reattaching. The problem is if this line takes longer than 1min the first time image the _operationId is never set so it just keeps resubmitting the same plan again and again and CreateReattachRequest() is never called image

I have tried setting _operationId to my own guid as part of CreateRequest() image but the subsequent reattach request fails with

24/11/27 14:15:43 ERROR ErrorUtils: Spark Connect RPC error during: reattachExecute. UserId: tdb. SessionId: 07fabbbb-ea13-4173-b1f7-c3a1f7d655e9. org.apache.spark.SparkSQLException: [INVALID_CURSOR.POSITION_NOT_FOUND] The cursor is invalid. The cursor position id is not found. at org.apache.spark.sql.connect.execution.ExecuteResponseObserver.$anonfun$getResponseIndexById$1(ExecuteResponseObserver.scala:216) at scala.collection.MapLike.getOrElse(MapLike.scala:131) at scala.collection.MapLike.getOrElse$(MapLike.scala:129) at scala.collection.AbstractMap.getOrElse(Map.scala:65) at org.apache.spark.sql.connect.execution.ExecuteResponseObserver.getResponseIndexById(ExecuteResponseObserver.scala:214) at org.apache.spark.sql.connect.service.ExecuteHolder.runGrpcResponseSender(ExecuteHolder.scala:155) at org.apache.spark.sql.connect.service.SparkConnectReattachExecuteHandler.handle(SparkConnectReattachExecuteHandler.scala:62) at org.apache.spark.sql.connect.service.SparkConnectService.reattachExecute(SparkConnectService.scala:178) at org.apache.spark.connect.proto.SparkConnectServiceGrpc$MethodHandlers.invoke(SparkConnectServiceGrpc.java:763) at org.sparkproject.connect.grpc.stub.ServerCalls$UnaryServerCallHandler$UnaryServerCallListener.onHalfClose(ServerCalls.java:182) at org.sparkproject.connect.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.halfClosed(ServerCallImpl.java:346) at org.sparkproject.connect.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1HalfClosed.runInContext(ServerImpl.java:860) at org.sparkproject.connect.grpc.internal.ContextRunnable.run(ContextRunnable.java:37) at org.sparkproject.connect.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:133) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) at java.base/java.lang.Thread.run(Unknown Source)

Any ideas or pointers. I'm using spark connect 3.5.3 at the moment.

GoEddie commented 4 days ago

Hey @tdb108 the problem is that when I first call execute I need to create my own operationId rather than waiting on the server sending me the initial one.

I didn't notice as in Spark 4.0 it uses a sort of keep alive so we get a response pretty quickly even if there is a long running operation.

I have pushed the fix in build 30, let me know if it works for you or if you are still getting issues.

tdb108 commented 4 days ago

Amazing! Thank you @GoEddie, works great. Interesting to know re Spark 4.0 - we will probably move our dev to 4 when the release candidate drops next year

GoEddie commented 4 days ago

Great, let me know if you need anything else