gregbaker / spark-celery

Helper to allow Python Celery tasks to do work in a Spark job.
Apache License 2.0
27 stars 6 forks source link

AttributeError: 'SparkCeleryApp' object has no attribute 'sc' #6

Closed NeelBhowmik closed 1 year ago

NeelBhowmik commented 6 years ago

Dear all, I try to use spark-celery in my application, when i do like the code below, I get an error "AttributeError: 'SparkCeleryApp' object has no attribute 'sc'". Could you please give any suggestion - where could be the possible problem? Thank you. Here is the code.

` from spark_celery import SparkCeleryApp, SparkCeleryTask, RDD_builder, main

backendURL = os.getenv("BACKEND_URL", "redis://redis:6379/0") brokerURL = os.getenv("BROKER_URL", "rabbitmq") brokerUser = os.getenv("BROKER_USER", "guest") brokerPassw = os.getenv("BROKER_PASSW", "guest") backendURL = os.getenv("BACKEND_URL", "redis://redis:6379/0")

def sparkconfig_builder(): from pyspark import SparkConf return SparkConf().setAppName('SparkCeleryTask') \ .set('spark.dynamicAllocation.enabled', 'true') \ .set('spark.dynamicAllocation.schedulerBacklogTimeout', 1) \ .set('spark.dynamicAllocation.minExecutors', 1) \ .set('spark.dynamicAllocation.executorIdleTimeout', 20) \ .set('spark.dynamicAllocation.cachedExecutorIdleTimeout', 60)

app = SparkCeleryApp(broker='pyamqp://'+ brokerUser + ":" + brokerPassw + "@" + brokerURL+'//', backend=backendURL, sparkconfig_builder=sparkconfig_builder) `

In the main, I call class qs, class QuerySystem(cbir_defunc.QuerySystem, SparkCeleryTask):, then I get the error.

` if name == 'main': qs = QuerySystem(CB_file,idx_file, knn) print(app.sc) <----- here is the error app.tasks.register(qs) main(options={

'queues': 'querySystemQueue',

})    

`

gregbaker commented 6 years ago

The Spark context stuff isn't created until app.worker_init(). That's called from the main() function.

NeelBhowmik commented 6 years ago

Hello, Thanks for the suggestion. But I am getting the same error, even after trying your solution. Is there any other work around? Thanks.

Kind regards, Neel

nguyenign commented 6 years ago

i have the same problem, spark context is not initialized, i had to create it manually outside of spark celery but it's not a good idea.

nguyenign commented 6 years ago

Hi All,

In the function worker_init(self, loader), we need a loader. Can you please tell me what is loader ?

I have an error when i call this function

def sparkconfig_builder(): from pyspark import SparkConf return SparkConf().setAppName('SparkCeleryTask') \ .set('spark.dynamicAllocation.enabled', 'true') \ .set('spark.dynamicAllocation.schedulerBacklogTimeout', 1) \ .set('spark.dynamicAllocation.minExecutors', 1) \ .set('spark.dynamicAllocation.executorIdleTimeout', 20) \ .set('spark.dynamicAllocation.cachedExecutorIdleTimeout', 60)

app = SparkCeleryApp(broker='pyamqp://'+ brokerUser + ":" + brokerPassw + "@" + brokerURL+'//', backend=backendURL, sparkconfig_builder=sparkconfig_builder)

app.worker_init()

The error message that i have: app.worker_init() TypeError: worker_init() missing 1 required positional argument: 'loader'

gregbaker commented 6 years ago

You shouldn't call worker_init() yourself. To get things going:

  1. create app (as you have)
  2. register some tasks
  3. call the spark_celery.main function to get things started.
from spark_celery import main
if __name__ == '__main__':
    # When called as a worker, run as a worker.
    main()
nguyenign commented 6 years ago

Thank you very much for your solution.

I tried it but i still have a problem, i need the spark context in registed task to broadcast some database file but after register task then excute main fucntion to initialize the spark context. I can not add spark context to my task as following:

from spark_celery import main
if __name__ == '__main__':
     app.tasks.register(qs)
     main()    
     qs.setSparkContext(app.sc)
     qs.broadCasting()
     print('broadcasting done')

Do you have any idea?