dotnet / spark

.NET for Apache® Spark™ makes Apache Spark™ easily accessible to .NET developers.
https://dot.net/spark
MIT License
2.02k stars 312 forks source link

[FEATURE REQUEST]: Add Trigger() to DataStreamWriter. #139

Closed petmoy closed 5 years ago

petmoy commented 5 years ago

Description Hi...I am trying to create a simple proof of concept application using .NET Spark that streams data from Kafka.

Facing issues:

Exception with Spark 2.4.x Moreover, the sample code below works fine (i.e. it receives data from Kafka) with spark version 2.3.x (microsoft-spark-2.3.x-0.3.0.jar) but throws a Java exception with spark version 2.4.x (microsoft-spark-2.4.x-0.3.0.jar)

image

Sample Code

        static void Main(string[] args)
        {
            SparkSession spark = SparkSession
                .Builder()
                .AppName("StructuredKafkaWordCount")
                .GetOrCreate();

            Console.WriteLine("============================= CONFIGURATION ===================");

            foreach(KeyValuePair<string, string> kvp in spark.SparkContext.GetConf().GetAll())
            {
                Console.WriteLine("Key = {0}, Value = {1}", kvp.Key, kvp.Value);
            }

            Console.WriteLine("============================= OUTPUT ===================");

            /*DataFrame lines = spark
                .ReadStream()
                .Format("socket")
                .Option("host", "0.0.0.0")
                .Option("port", 9999)
                .Load();*/

            DataFrame lines = spark
                .ReadStream()
                .Format("kafka")
                .Option("kafka.bootstrap.servers", "10.27.0.245:9092")
                .Option("subscribe", "test")
                .Load()
                .SelectExpr("CAST(value AS STRING)");

            DataFrame words = lines
                .Select(Explode(Split(lines["value"], " "))
                    .Alias("word"));
            DataFrame wordCounts = words.GroupBy("word").Count();

            Microsoft.Spark.Sql.Streaming.StreamingQuery query = wordCounts
                .WriteStream()
                .OutputMode("complete")
                .Format("console")
                .Start();

            query.AwaitTermination();
        }

For running the code I build the project in VS2019 and then in cmd:

spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.3.2 --class org.apache.spark.deploy.DotnetRunner --master local microsoft-spark-2.3.x-0.3.0.jar dotnet HelloSpark.dll

Also, you must have a Producer sending data to a Kafka topic from which to stream data.

petmoy commented 5 years ago

@imback82

imback82 commented 5 years ago

Exception with Spark 2.4.x Moreover, the sample code below works fine (i.e. it receives data from Kafka) with spark version 2.3.x (microsoft-spark-2.3.x-0.3.0.jar) but throws a Java exception with spark version 2.4.x (microsoft-spark-2.4.x-0.3.0.jar)

For Spark 2.4.x, what's the command you ran? Did you make sure you are using 2.4 version for spark-sql-kafka: https://mvnrepository.com/artifact/org.apache.spark/spark-sql-kafka-0-10?

imback82 commented 5 years ago
  • I cannot find how to achieve continuous streaming. Data are processed in batches periodically every n seconds

This is how Spark streaming works: http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#programming-model

In general, the way of setting configurations such as triggers, batch interval, windows size, sliding windows size it is not clear to me (or I am looking in the wrong place :) ). The available API documentation for .NET Spark has the Option(s) methods but the accepted valid options I cannot find...

Please refer to the link I pasted above.

petmoy commented 5 years ago

@imback82 Yes you are right, the spark sql version was wrong.

petmoy commented 5 years ago

@imback82 Thanks for the help regarding the versions but the maven link I don't see how it can help me with the configurations.

As I mentioned, I am trying to achieve continuous processing with .NET Spark. Here https://spark.apache.org/docs/2.4.3/structured-streaming-programming-guide.html#triggers I see how this can be done with other languages and I am looking for the equilavent for .NET Spark since a Trigger API does not seem to exist

image

imback82 commented 5 years ago

@imback82 Thanks for the help regarding the versions but the maven link I don't see how it can help me with the configurations.

Sorry, I fixed the link.

As I mentioned, I am trying to achieve continuous processing with .NET Spark. Here https://spark.apache.org/docs/2.4.3/structured-streaming-programming-guide.html#triggers I see how this can be done with other languages and I am looking for the equilavent for .NET Spark since a Trigger API does not seem to exist

Got it. Looks like we are missing the API. We will add this in the next release.

danny8002 commented 5 years ago

waiting for this feature ready.

imback82 commented 5 years ago

@danny8002 we expect the next release to be the second week of July.

danny8002 commented 5 years ago

it is too late for me, could you please send a workable PR? I just implement Trigger() by myself, see #153 , ( i just write it according to spark/Trigger.java). but when i test it , i found Trigger.Continuous() don't work. (Trigger.ProcessingTime works).

here is the code with Trigger.Continuous();


            var kafkaServer = string.Join(",", args.Input.KafkaHost);
            var inputKafka = args.Input.Topic;
            var outputKafka = args.Output.Topic;

            SparkSession spark = SparkSession
                .Builder()
                .AppName("Play Spark")
                .GetOrCreate();

            var udfReg = spark.Udf();
            udfReg.Register<string, string, string>("verify", (a,b)=>a+b);

            DataFrame lines = spark
                .ReadStream()
                .Format("kafka")
                .Option("kafka.bootstrap.servers", kafkaServer)
                .Option("subscribe", inputKafka)
                .Load()
                .SelectExpr("CAST(key AS STRING)", "CAST(value AS STRING)");

            DataFrame verify = lines.Select(
                Functions.CallUDF("verify", lines.Col("key"), lines.Col("value")).Alias("value"))
                .WithColumn("topic", Functions.Lit(outputKafka));

            var toKafka = verify
                 .WriteStream()
                 .Format("kafka")
                 .Option("kafka.bootstrap.servers", kafkaServer)
                 .Option("checkpointLocation", "file:///d:/temp/abc")
                 .Trigger(Trigger.Continuous(2000))
                 .OutputMode(OutputMode.Update)
                 .Start();

            toKafka.AwaitTermination();

Microsoft.Spark.Worker.exe runs without data, and then closed immediately (my other program keeps sending data to input kafka with qps = 2 records/s), see

stderr

Spark Executor Command: "C:\Progra~1\Java\jdk1.8.0_31\\bin\java" "-cp" "D:\work\v2\spark-2.3.1-bin-hadoop2.7\bin\..\conf\;D:\work\v2\spark-2.3.1-bin-hadoop2.7\jars\*" "-Xmx16384M" "-Dspark.driver.port=5108" "-Dspark.network.timeout=300s" "org.apache.spark.executor.CoarseGrainedExecutorBackend" "--driver-url" "spark://CoarseGrainedScheduler@localhost:5108" "--executor-id" "0" "--hostname" "localhost" "--cores" "2" "--app-id" "app-20190627112318-0016" "--worker-url" "spark://Worker@localhost:25332"
========================================

DotnetWorker PID:[19044] Args:[-m pyspark.worker] SparkVersion:[2.3.1]
[2019-06-27T03:23:27.0367984Z] [SZ_M3] [Info] [SimpleWorker] RunSimpleWorker() is starting with port = 5240.
[2019-06-27T03:23:27.1087977Z] [SZ_M3] [Info] [TaskRunner] [0] Starting with ReuseSocket[False].
[2019-06-27T03:23:27.1658004Z] [SZ_M3] [Debug] [TaskRunner] [0] Received END_OF_STREAM signal.
[2019-06-27T03:23:27.1658004Z] [SZ_M3] [Info] [TaskRunner] [0] Processed a task: readComplete:True, entries:0
[2019-06-27T03:23:27.1668005Z] [SZ_M3] [Info] [TaskRunner] [0] Waiting for JVM side to close socket.
[2019-06-27T03:23:27.2238012Z] [SZ_M3] [Info] [TaskRunner] [0] JVM side has closed socket.
[2019-06-27T03:23:27.2238012Z] [SZ_M3] [Info] [TaskRunner] [0] Finished running 1 task(s).
[2019-06-27T03:23:27.2238012Z] [SZ_M3] [Info] [SimpleWorker] RunSimpleWorker() finished successfully

stdout

2019-06-27 12:10:13 INFO  MemoryStore:54 - Block broadcast_0_piece0 stored as bytes in memory (estimated size 7.5 KB, free 8.4 GB)
2019-06-27 12:10:18 INFO  TorrentBroadcast:54 - Reading broadcast variable 0 took 5101 ms
2019-06-27 12:10:18 INFO  MemoryStore:54 - Block broadcast_0 stored as values in memory (estimated size 17.3 KB, free 8.4 GB)
2019-06-27 12:10:19 ERROR Executor:91 - Exception in task 0.1 in stage 0.0 (TID 1)
org.apache.spark.sql.execution.streaming.continuous.ContinuousTaskRetryException: Continuous execution does not support task retry
    at org.apache.spark.sql.execution.streaming.continuous.ContinuousDataSourceRDD.compute(ContinuousDataSourceRDDIter.scala:53)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
    at org.apache.spark.scheduler.Task.run(Task.scala:109)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
2019-06-27 12:10:24 INFO  CoarseGrainedExecutorBackend:54 - Got assigned task 2

master log:

2019-06-27 11:24:33 INFO  AbstractCoordinator:542 - Marking the coordinator 10.0.75.1:9092 (id: 2147483647 rack: null) dead for group spark-kafka-source-f9bf6566-f878-4057-98bb-4c51066aa826--111507653-driver-0
Exception in thread "epoch update thread for [id = ecaa75ec-cfaa-422f-8f96-efb673f47cb3, runId = 0d093931-162e-417f-a30f-f17ef701e1d4]" org.apache.spark.rpc.RpcTimeoutException: Futures timed out after [300 seconds]. This timeout is controlled by spark.network.timeout
        at org.apache.spark.rpc.RpcTimeout.org$apache$spark$rpc$RpcTimeout$$createRpcTimeoutException(RpcTimeout.scala:47)
        at org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:62)
        at org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:58)
        at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
        at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:76)
        at org.apache.spark.rpc.RpcEndpointRef.askSync(RpcEndpointRef.scala:92)
        at org.apache.spark.rpc.RpcEndpointRef.askSync(RpcEndpointRef.scala:76)
        at org.apache.spark.sql.execution.streaming.continuous.ContinuousExecution$$anon$2$$anonfun$run$1.apply$mcZ$sp(ContinuousExecution.scala:246)
        at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:56)
        at org.apache.spark.sql.execution.streaming.continuous.ContinuousExecution$$anon$2.run(ContinuousExecution.scala:235)
        at java.lang.Thread.run(Thread.java:745)
Caused by: java.util.concurrent.TimeoutException: Futures timed out after [300 seconds]
        at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
        at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
        at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:201)
        at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)

Hope helps from you!

imback82 commented 5 years ago

it is too late for me, could you please send a workable PR? I just implement Trigger() by myself, see #153 ,

@danny8002 I will review this.

Microsoft.Spark.Worker.exe runs without data, and then closed immediately (my other program keeps sending data to input kafka with qps = 2 records/s)

This is expected for SimpleWorker. It will be re-launched per task.

Can you first check if continous works fine without UDFs (so that it doesn't involve the worker)?

danny8002 commented 5 years ago

Yes, it works without UDF

   DataFrame verify = lines.Select(
                Functions.CallUDF("verify", lines.Col("key"), lines.Col("value")).Alias("value"))
                .WithColumn("topic", Functions.Lit(outputKafka));

=>

    DataFrame verify = lines.Select(
               Functions.Concat(lines.Col("key"), lines.Col("value")).Alias("value"))
              .WithColumn("topic", Functions.Lit(outputKafka));

As your say 'it is expected', how to let UDF works? how to let the worker/Task start again? I write the same program with Java and it works for UDF. and i am curious how python works ...

danny8002 commented 5 years ago

any insight about running UDF ?

suhsteve commented 5 years ago

@danny8002 I've reproduced your test using pyspark and I'm also encountering issues using continuous trigger and UDFs. This seems to be a known issue Spark-27234 and there is an active PR that should address it.