oap-project / raydp

RayDP provides simple APIs for running Spark on Ray and integrating Spark with AI libraries.
Apache License 2.0
308 stars 68 forks source link

[stream] does raydp support spark stream data? #393

Open fwensen opened 9 months ago

fwensen commented 9 months ago

i had write some code to consume kafka stream data, but got error following is my raydp code

import ray
import raydp

ray.init()
packages = "org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.0"
spark = raydp.init_spark(app_name='RayDP Example',
                         num_executors=2,
                         executor_cores=2,
                         executor_memory='4GB',
                         configs={
                             "spark.jars.packages": packages
                         })

df = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:29092") \
    .option("subscribe", "ray-dev") \
    .load()

def duf_fun(v):
    print(f"printt {v}")
    return v

df2 = ray.data.from_spark(df).map(duf_fun).to_spark(spark)
query = (
    df2
    .writeStream
    .outputMode("append")
    .format("console")
    .start()
)

query.awaitTermination()

but got error

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
Traceback (most recent call last):
  File "/Users/xxxxxxx/workspace/work/ray-source/raydp_kafka.py", line 27, in <module>
    df2 = ray.data.from_spark(df).map(duf_fun).to_spark(spark)
  File "/Users/xxxxxxx/miniforge3/envs/ray-dev/lib/python3.8/site-packages/ray/data/read_api.py", line 2301, in from_spark
    return raydp.spark.spark_dataframe_to_ray_dataset(df, parallelism)
  File "/Users/xxxxxxx/miniforge3/envs/ray-dev/lib/python3.8/site-packages/raydp/spark/dataset.py", line 175, in spark_dataframe_to_ray_dataset
    num_part = df.rdd.getNumPartitions()
  File "/Users/xxxxxxx/miniforge3/envs/ray-dev/lib/python3.8/site-packages/pyspark/sql/dataframe.py", line 175, in rdd
    jrdd = self._jdf.javaToPython()
  File "/Users/xxxxxxx/miniforge3/envs/ray-dev/lib/python3.8/site-packages/py4j/java_gateway.py", line 1321, in __call__
    return_value = get_return_value(
  File "/Users/xxxxxxx/miniforge3/envs/ray-dev/lib/python3.8/site-packages/pyspark/sql/utils.py", line 196, in deco
    raise converted from None
pyspark.sql.utils.AnalysisException: Queries with streaming sources must be executed with writeStream.start();
kafka
kira-lin commented 9 months ago

No, I'm afraid that raydp does not support kafka streaming. I'm not sure how much gap there is, we have never tested it before.