Closed avatart93 closed 4 years ago
Why not just use the pulsar functions python framework ? https://pulsar.apache.org/docs/en/functions-api/
I am not a Spark expert, neither a Pulsar one, but I think I couldn't achieve with Pulsar functions half the stuff I could with a processing specific framework like Spark, maybe even Flink or Storm. Pulsar does a lot, but processing isn't one of its strongest capabilities (My own believe at this moment, if a Pulsar expert differs with my opinion, please open my eyes). So, been able to migrate my business logic elsewhere would be a nice perk.
Hi @avatart93 , we've already developed a Spark-Pulsar Connector that allows you to read/write data from/to Pulsar using Spark's new data source API, it can be used in Java and Python quite similar with Structured Streaming + Kafka Integration Guide. Moreover, life will be easier since we also integrate Pulsar Schema. We'll contribute it back soon.
At your side, you can use Spark-ML with DataFrame
API to do machine learning related tasks. Also, you can refer to Project Hydrogen for the latest effort of Spark-AI integration as well as existing AI project on Spark:
Big data for AI There are many efforts from the Spark community to integrate Spark with AI/ML frameworks: ● (Yahoo) CaffeOnSpark, TensorFlowOnSpark ● (Intel) BigDL ● (John Snow Labs) Spark-NLP ● (Databricks) spark-sklearn, tensorframes, spark-deep-learning ● … 80+ ML/AI packages on spark-packages.org
similar issue: https://github.com/apache/pulsar/issues/4585
Thanks a lot for your answer @yjshen I look forward to see this in next releases, although I don't see any milestones for this feature integration. Is this somewhat prioritized right now?
@avatart93 ah, it's my first priority now, will let you know as soon as we've got the contribute plan.
@avatart93 we've open-sourced pulsar-spark connector: https://github.com/streamnative/pulsar-spark. Please take a look and file issues when you have any difficulties.
@avatart93 we've open-sourced pulsar-spark connector: https://github.com/streamnative/pulsar-spark. Please take a look and file issues when you have any difficulties.
That's actually for Scala, weren't you building this receiver fro Python?
Hi Dexter,
The reason we created Pulsar Functions is because we noticed that most stream processing applications are actually not very complex. Thus, we created Pulsar Functions with a simple API but it does support powerful features and processing semantics such as stateful processing, windowing, exactly-once delivery, and etc. However, Pulsar Functions does not intend to replace other data processing platforms such as Spark.
Best,
Jerry
On Tue, Jul 9, 2019 at 9:42 AM Dexter notifications@github.com wrote:
@avatart93 https://github.com/avatart93 we've open-sourced pulsar-spark connector: https://github.com/streamnative/pulsar-spark. Please take a look and file issues when you have any difficulties.
That's actually for Scala, weren't you building this receiver fro Python?
— You are receiving this because you are subscribed to this thread. Reply to this email directly, view it on GitHub https://github.com/apache/pulsar/issues/4608?email_source=notifications&email_token=AA3SFLZRH4PO4EQ3DSTDDKTP6S5XFA5CNFSM4H3UXCTKYY3PNVWWK3TUL52HS4DFVREXG43VMVBW63LNMVXHJKTDN5WW2ZLOORPWSZGODZQ23IA#issuecomment-509717920, or mute the thread https://github.com/notifications/unsubscribe-auth/AA3SFL74F4H3FUR5ZELA2V3P6S5XFANCNFSM4H3UXCTA .
@avatart93 we've open-sourced pulsar-spark connector: https://github.com/streamnative/pulsar-spark. Please take a look and file issues when you have any difficulties.
That's actually for Scala, weren't you building this receiver fro Python?
Hi @avatart93 , actually, it doesn't matter what language the connector is written by, you are not actually talking to the connector directly, instead, you use pyspark to run your python application, and spark will find the connector to talk to Pulsar.
Therefore, you could just use our lib while opening a pyspark session:
bin/pyspark --packages io.streamnative.connectors:pulsar-spark-connector_2.11:2.4.0 --repositories https://dl.bintray.com/streamnative/maven
and it will show the dependency discovery procedure like:
Python 2.7.10 (default, Feb 22 2019, 21:17:52)
[GCC 4.2.1 Compatible Apple LLVM 10.0.1 (clang-1001.0.37.14)] on darwin
Type "help", "copyright", "credits" or "license" for more information.
https://dl.bintray.com/streamnative/maven added as a remote repository with the name: repo-1
Ivy Default Cache set to: /Users/yijie/.ivy2/cache
The jars for the packages stored in: /Users/yijie/.ivy2/jars
:: loading settings :: url = jar:file:/Users/yijie/app/spark-2.4.3-bin-hadoop2.7/jars/ivy-2.4.0.jar!/org/apache/ivy/core/settings/ivysettings.xml
io.streamnative.connectors#pulsar-spark-connector_2.11 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-92522140-762c-4599-a7f6-d343a91c6463;1.0
confs: [default]
found io.streamnative.connectors#pulsar-spark-connector_2.11;2.4.0 in spark-list
found org.slf4j#slf4j-api;1.7.25 in spark-list
found io.swagger#swagger-annotations;1.5.21 in spark-list
found com.sun.activation#javax.activation;1.2.0 in spark-list
found org.slf4j#jul-to-slf4j;1.7.25 in spark-list
:: resolution report :: resolve 258ms :: artifacts dl 7ms
:: modules in use:
com.sun.activation#javax.activation;1.2.0 from spark-list in [default]
io.streamnative.connectors#pulsar-spark-connector_2.11;2.4.0 from spark-list in [default]
io.swagger#swagger-annotations;1.5.21 from spark-list in [default]
org.slf4j#jul-to-slf4j;1.7.25 from spark-list in [default]
org.slf4j#slf4j-api;1.7.25 from spark-list in [default]
---------------------------------------------------------------------
| | modules || artifacts |
| conf | number| search|dwnlded|evicted|| number|dwnlded|
---------------------------------------------------------------------
| default | 5 | 0 | 0 | 0 || 5 | 0 |
---------------------------------------------------------------------
:: retrieving :: org.apache.spark#spark-submit-parent-92522140-762c-4599-a7f6-d343a91c6463
confs: [default]
0 artifacts copied, 5 already retrieved (0kB/7ms)
after the initialization, you could use talk to pulsar in your pyspark program:
write data to pulsar
>>> df = spark.createDataFrame(
... [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
... ("id", "v"))
>>> df.write.format("pulsar").option("service.url", "pulsar://localhost:6650").option("admin.url", "http://localhost:8088").option("topic", "abc").save()
read data from pulsar:
>>> a = spark.read.format("pulsar").option("service.url", "pulsar://localhost:6650").option("admin.url", "http://localhost:8088").option("topic", "abc").load()
>>> x = a.collect()
>>> x
and it will show you the df you've just write to pulsar:
[Row(id=2, v=3.0, __key=None, __topic=u'persistent://public/default/abc', __messageId=bytearray(b'\x08\xbc\x02\x10\x00 \x00'), __publishTime=datetime.datetime(2019, 7, 10, 11, 45, 49, 341000), __eventTime=None), Row(id=2, v=10.0, __key=None, __topic=u'persistent://public/default/abc', __messageId=bytearray(b'\x08\xbc\x02\x10\x01 \x00'), __publishTime=datetime.datetime(2019, 7, 10, 11, 45, 49, 343000), __eventTime=None), Row(id=1, v=1.0, __key=None, __topic=u'persistent://public/default/abc', __messageId=bytearray(b'\x08\xbc\x02\x10\x02 \x00'), __publishTime=datetime.datetime(2019, 7, 10, 11, 45, 49, 341000), __eventTime=None), Row(id=1, v=2.0, __key=None, __topic=u'persistent://public/default/abc', __messageId=bytearray(b'\x08\xbc\x02\x10\x03 \x00'), __publishTime=datetime.datetime(2019, 7, 10, 11, 45, 49, 341000), __eventTime=None), Row(id=2, v=5.0, __key=None, __topic=u'persistent://public/default/abc', __messageId=bytearray(b'\x08\xbc\x02\x10\x04 \x00'), __publishTime=datetime.datetime(2019, 7, 10, 11, 45, 49, 343000), __eventTime=None)]
PySpark's datasource api is quite similar to that of Scala (with the same name and arguments), you could refer to https://github.com/streamnative/pulsar-spark/blob/master/README.md
for the whole list of viable usage and options.
@yjshen this seems real nice, thanks a lot. But I have one last issue, recently I've been asking through pulsar's slack (no replies yet), if I could use the java receiver to connect to a partitioned topic and receive data in shared mode through different threads distributed over spark worker nodes. I believe that Kafka can do this, and let's me avoid bottlenecks while receiving huge amounts of data (since I believe this threads will then be distributed over different executors). Can I do something like that with your receiver?
@avatart93 , currently with pulsar-spark-connector, you could read multiple topics/partitions of an identical schema as a single DataSet, internally, the consumption of each topic-partition is handled by one task, and inside it, a consumer is used to read data. It's quite similar to Kafka connector, no worries about this.
@yjshen So if I have a pulsar partitioned topic (lets say 12 partitions) then 12 spark tasks will be created if using both the java pulsar-spark receivers (official) and your implementation (this one). And last question, I saw your readme and notice you only do queries, but I can use it for standard basic consumption too right, like just receive messages and put them on rdds, no queries required (lets call it, the classic way).
@yjshen And by the way, let me please thank you again for your time and effort, this means a lot to me, thanks again for all your help.
@yjshen So if I have a pulsar partitioned topic (lets say 12 partitions) then 12 spark tasks will be created if using both the java pulsar-spark receivers (official) and your implementation (this one). And last question, I saw your readme and notice you only do queries, but I can use it for standard basic consumption too right, like just receive messages and put them on rdds, no queries required (lets call it, the classic way).
If you want to use RDD directly in your program instead of DataSet, you could just call .rdd
on the DataSet object.
I highlight queries in the doc since I thought queries are much easier to use as well as extend because it's typed and save efforts for format parsing, at the same time, the queries are likely to be more efficient since Spark SQL engine do a lot of optimizations not only during planning but also with code generation for efficient execution. Just for your information here.
Feel free to file issues you meet during using the connector, I'm glad to help. Enjoy 😊
Closed this issue. Since the pulsar-spark connector already supports structured streaming, you can use python language to program spark applications to use that connector.
@yjshen following your user example, I get this error
>>> df = spark.createDataFrame([(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],("id", "v"))
>>> df.write.format("pulsar").option("service.url", "pulsar://localhost:6650").option("admin.url", "http://localhost:8088").option("topic", "abc").save()
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "/Users/kevin/anaconda3/lib/python3.8/site-packages/pyspark/sql/readwriter.py", line 828, in save
self._jwrite.save()
File "/Users/kevin/anaconda3/lib/python3.8/site-packages/pyspark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1304, in __call__
File "/Users/kevin/anaconda3/lib/python3.8/site-packages/pyspark/sql/utils.py", line 128, in deco
return f(*a, **kw)
File "/Users/kevin/anaconda3/lib/python3.8/site-packages/pyspark/python/lib/py4j-0.10.9-src.zip/py4j/protocol.py", line 326, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o57.save.
: java.lang.NoClassDefFoundError: org/apache/spark/sql/sources/v2/StreamWriteSupport
at java.lang.ClassLoader.defineClass1(Native Method)
at java.lang.ClassLoader.defineClass(ClassLoader.java:756)
at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
at java.net.URLClassLoader.defineClass(URLClassLoader.java:468)
at java.net.URLClassLoader.access$100(URLClassLoader.java:74)
at java.net.URLClassLoader$1.run(URLClassLoader.java:369)
at java.net.URLClassLoader$1.run(URLClassLoader.java:363)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:362)
at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:348)
at java.util.ServiceLoader$LazyIterator.nextService(ServiceLoader.java:370)
at java.util.ServiceLoader$LazyIterator.next(ServiceLoader.java:404)
at java.util.ServiceLoader$1.next(ServiceLoader.java:480)
at scala.collection.convert.Wrappers$JIteratorWrapper.next(Wrappers.scala:44)
at scala.collection.Iterator.foreach(Iterator.scala:941)
at scala.collection.Iterator.foreach$(Iterator.scala:941)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1429)
at scala.collection.IterableLike.foreach(IterableLike.scala:74)
at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
at scala.collection.TraversableLike.filterImpl(TraversableLike.scala:255)
at scala.collection.TraversableLike.filterImpl$(TraversableLike.scala:249)
at scala.collection.AbstractTraversable.filterImpl(Traversable.scala:108)
at scala.collection.TraversableLike.filter(TraversableLike.scala:347)
at scala.collection.TraversableLike.filter$(TraversableLike.scala:347)
at scala.collection.AbstractTraversable.filter(Traversable.scala:108)
at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:659)
at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSourceV2(DataSource.scala:743)
at org.apache.spark.sql.DataFrameWriter.lookupV2Provider(DataFrameWriter.scala:966)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:303)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ClassNotFoundException: org.apache.spark.sql.sources.v2.StreamWriteSupport
at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
... 44 more
Do we have a version for Spark 3.1.1 and Spark 2.3?
@yjshen following your user example, I get this error
>>> df = spark.createDataFrame([(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],("id", "v")) >>> df.write.format("pulsar").option("service.url", "pulsar://localhost:6650").option("admin.url", "http://localhost:8088").option("topic", "abc").save() Traceback (most recent call last): File "<stdin>", line 1, in <module> File "/Users/kevin/anaconda3/lib/python3.8/site-packages/pyspark/sql/readwriter.py", line 828, in save self._jwrite.save() File "/Users/kevin/anaconda3/lib/python3.8/site-packages/pyspark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1304, in __call__ File "/Users/kevin/anaconda3/lib/python3.8/site-packages/pyspark/sql/utils.py", line 128, in deco return f(*a, **kw) File "/Users/kevin/anaconda3/lib/python3.8/site-packages/pyspark/python/lib/py4j-0.10.9-src.zip/py4j/protocol.py", line 326, in get_return_value py4j.protocol.Py4JJavaError: An error occurred while calling o57.save. : java.lang.NoClassDefFoundError: org/apache/spark/sql/sources/v2/StreamWriteSupport at java.lang.ClassLoader.defineClass1(Native Method) at java.lang.ClassLoader.defineClass(ClassLoader.java:756) at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142) at java.net.URLClassLoader.defineClass(URLClassLoader.java:468) at java.net.URLClassLoader.access$100(URLClassLoader.java:74) at java.net.URLClassLoader$1.run(URLClassLoader.java:369) at java.net.URLClassLoader$1.run(URLClassLoader.java:363) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:362) at java.lang.ClassLoader.loadClass(ClassLoader.java:418) at java.lang.ClassLoader.loadClass(ClassLoader.java:351) at java.lang.Class.forName0(Native Method) at java.lang.Class.forName(Class.java:348) at java.util.ServiceLoader$LazyIterator.nextService(ServiceLoader.java:370) at java.util.ServiceLoader$LazyIterator.next(ServiceLoader.java:404) at java.util.ServiceLoader$1.next(ServiceLoader.java:480) at scala.collection.convert.Wrappers$JIteratorWrapper.next(Wrappers.scala:44) at scala.collection.Iterator.foreach(Iterator.scala:941) at scala.collection.Iterator.foreach$(Iterator.scala:941) at scala.collection.AbstractIterator.foreach(Iterator.scala:1429) at scala.collection.IterableLike.foreach(IterableLike.scala:74) at scala.collection.IterableLike.foreach$(IterableLike.scala:73) at scala.collection.AbstractIterable.foreach(Iterable.scala:56) at scala.collection.TraversableLike.filterImpl(TraversableLike.scala:255) at scala.collection.TraversableLike.filterImpl$(TraversableLike.scala:249) at scala.collection.AbstractTraversable.filterImpl(Traversable.scala:108) at scala.collection.TraversableLike.filter(TraversableLike.scala:347) at scala.collection.TraversableLike.filter$(TraversableLike.scala:347) at scala.collection.AbstractTraversable.filter(Traversable.scala:108) at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:659) at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSourceV2(DataSource.scala:743) at org.apache.spark.sql.DataFrameWriter.lookupV2Provider(DataFrameWriter.scala:966) at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:303) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) at py4j.Gateway.invoke(Gateway.java:282) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:238) at java.lang.Thread.run(Thread.java:748) Caused by: java.lang.ClassNotFoundException: org.apache.spark.sql.sources.v2.StreamWriteSupport at java.net.URLClassLoader.findClass(URLClassLoader.java:382) at java.lang.ClassLoader.loadClass(ClassLoader.java:418) at java.lang.ClassLoader.loadClass(ClassLoader.java:351) ... 44 more
I received the same error, Probably the Scala and Spark version is not supported - Check what version you are using Not sure when the upgrade will be available - Available Connectors
Is your feature request related to a problem? Please describe. I would like to be able to use python's AI frameworks like tensorflow, scikit-learn or Apache Systemml from a Spark streaming app, but there is no pulsar-spark streaming receiver for python that allow me to achieve this.
Describe the solution you'd like I would like this (https://pulsar.apache.org/docs/v2.0.1-incubating/adaptors/PulsarSpark) feature for python too.
Describe alternatives you've considered At least, provide some sort of documentation about this topic.
Additional context This will enhance your feature parity with Kafka (https://spark.apache.org/docs/2.3.1/streaming-kafka-0-8-integration.html) which provide this feature and the direct stream for java, python and scala.