gregbaker / spark-celery

Helper to allow Python Celery tasks to do work in a Spark job.
Apache License 2.0
27 stars 6 forks source link

Show i use `celery multi`and how do i use it ? #8

Closed shen-xianpeng closed 1 year ago

shen-xianpeng commented 6 years ago

now if i run tasks quickly like below, no waiting , one by another: async_exec.delay("a");async_exec.delay("b"); it will raise a Error:

File "/usr/hdp/current/spark2-client/python/pyspark/sql/functions.py", line 1957, in wrapper return udf_obj(*args) File "/usr/hdp/current/spark2-client/python/pyspark/sql/functions.py", line 1916, in call judf = self._judf File "/usr/hdp/current/spark2-client/python/pyspark/sql/functions.py", line 1900, in _judf self._judf_placeholder = self._create_judf() File "/usr/hdp/current/spark2-client/python/pyspark/sql/functions.py", line 1911, in _create_judf judf = sc._jvm.org.apache.spark.sql.execution.python.UserDefinedPythonFunction( File "/usr/hdp/current/spark2-client/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py", line 1474, in getattr raise Py4JError("{0} does not exist in the JVM".format(new_fqn)) Py4JError: org.apache does not exist in the JVM

async_exec.delay("a").wait();async_exec.delay("b").wait() works fine

gregbaker commented 1 year ago

I can't reproduce this. I just tried this with the demo code, and it queues the 26 tasks as expected.

from demo import *
import string

inputs = '/home/greg/tmp/spark-celery/wordcount'

tasks = [WordCount().delay('wordcount', c) for c in string.ascii_lowercase]

for t in tasks:
    print(t.wait())