pgiri / dispy

Distributed and Parallel Computing Framework with / for Python
https://dispy.org
Other
260 stars 55 forks source link

cluster.wait() returns before call-back is done #153

Open UnitedMarsupials-zz opened 5 years ago

UnitedMarsupials-zz commented 5 years ago

When creating the cluster, I specify callback to process each job's results asynchronously (write results into several files). After submitting everything, the script goes into cluster.wait(). After the method exits, the script closes the output-files:

for batch in batches:
    batch = str(batch[0])
    job = cluster.submit(batch)
    job.id = batch

logger.info('Waiting for processing to finish')
cluster.wait()
cluster.print_status()

logger.info('Processing finished')
for MPOR, fd in Outputs.items():
    logger.info('Closing output file for MPOR %s' % MPOR)
    fd.close()

Unfortunately, cluster.wait() may exit before the specified callback is done with the last job's results -- leading to data-loss in my case. Here are the events as they happened at the end of a 5-hour run:

2018-09-24 06:11:43 dispy - Received reply for job 4 / 46846072 from 10.78.18.54
2018-09-24 06:11:43 dispy - Batch 4 has 0 missing entries, 420 result-sets
2018-09-24 06:11:43 dispy - Processing finished
2018-09-24 06:11:43 dispy - Closing output file for MPOR 20
2018-09-24 06:11:43 dispy - Closing output file for MPOR 5
2018-09-24 06:11:43 dispy - Closing node 10.78.6.104 for processBatch / 1537764714379
2018-09-24 06:11:43 dispy - Closing node 10.78.4.82 for processBatch / 1537764714379
2018-09-24 06:11:43 dispy - Closing node 10.78.14.66 for processBatch / 1537764714379
2018-09-24 06:11:43 dispy - Closing node 10.78.4.83 for processBatch / 1537764714379
2018-09-24 06:11:43 dispy - Closing node 10.78.18.54 for processBatch / 1537764714379
2018-09-24 06:11:43 dispy - Closing node 10.78.18.91 for processBatch / 1537764714379
2018-09-24 06:11:43 dispy - Closing node 10.78.12.87 for processBatch / 1537764714379
2018-09-24 06:11:43 dispy - Closing node 10.78.6.105 for processBatch / 1537764714379
2018-09-24 06:11:43 dispy - Closing node 10.109.51.131 for processBatch / 1537764714379
2018-09-24 06:11:43 dispy - Shutting down scheduler ...
2018-09-24 06:11:43 dispy - Scheduler quitting: 0
2018-09-24 06:11:43 dispy - Scheduler quit
2018-09-24 06:11:43 pycos - terminating task !timer_proc/44846520 (daemon)
2018-09-24 06:11:43 pycos - terminating task !tcp_server/44858472 (daemon)
2018-09-24 06:11:43 pycos - pycos terminated
Traceback (most recent call last):
  File "orchestrator.py", line 133, in done
    processSuccess(job.result)
  File "orchestrator.py", line 119, in processSuccess
    (Id, c, p))
ValueError: I/O operation on closed file

The second line above -- logging statistics about the last job -- comes from my callback. Unfortunately, midway through the handler (processSuccess) writing out the results, the output files got closed by the main thread.

cluster.wait() ought to wait until all of the callbacks have returned...

pgiri commented 5 years ago

cluster.wait() finishes when all submitted jobs have finished executing. Callbacks run in another thread (so if a callback takes time to finish, cluster is not blocked from receiving other job replies etc.). cluster.wait() can be called by client as many times as necessary (e.g., to submit bunch of jobs and wait for them to finish before submitting more jobs and waiting for them to finish etc.

If synchronization is required for callbacks, the client can use an event or condition variable to maintain status of submitted vs finished callbacks; for example, bounded_submit.py example uses a condition variable to submit jobs to keep scheduler busy.

May be the documentation for cluster.wait() should be expanded with this detail.

UnitedMarsupials-zz commented 5 years ago

cluster.wait() finishes when all submitted jobs have finished executing

That's a flaw, then. The method needs to wait for all of the callbacks to return too. A flaw, which also makes the earlier-discussed new job-submissions from inside a callback difficult...

The examples all suggest, cluster.wait() returns, when the processing of the submitted jobs is done -- logically, that ought to include the processing of the results by callbacks.

cluster.wait() can be called by client as many times as necessary

Yes, that's good -- and I'm not questioning this part at all.

If synchronization is required for callbacks

Synchronization is not required for callbacks -- not in my case. But I do expect the cluster to still exist while the callbacks are executing... Yet, unless some special hacks are implemented, the main thread may exit before the callback(s) finish -- that's not right.

bounded_submit.py example uses a condition variable to submit jobs to keep scheduler busy.

In my case, that is not -- or should not be -- necessary. I submit all of the jobs and leave it to Dispy to schedule. A vanilla case like mine should not require special synchronization efforts...

UnitedMarsupials-zz commented 5 years ago

FWIW, I implemented the following as a work-around:

import threading
CV = threading.Condition()
...

def done(task):
        ... process the task ...
        with CV:
        CV.notifyAll()

...
while len(cluster._jobs) + cluster._pending_jobs > 0:
    logger.info('Waiting for %d jobs to finish' % (len(cluster._jobs) + cluster._pending_jobs))
    with CV:
        CV.wait()

So, every time the job-callback (done) is invoked, it updates the condition-variable before returning, which wakes up the main thread. The thread stays in the loop until there are no jobs remaining. This seems to work, but I am still convinced, this should be happening inside Dispy.

xptree commented 5 years ago

Hi @UnitedMarsupials and @pgiri , I also met this problem.

I try to use threading.Event to solve this problem.

def job_callback(job):
    # dump the job.result to file ...
    global events
    events[job.id].set()

cluster = dispy.JobCluster(compute, callback=job_callback)

events = dict()
for i in range(100):
    events[i] = threading.Event()
    job = cluster.submit(i)
    job.id = i

# This method waits for all jobs to finish execution, but not for all callbacks to be executed.
cluster.wait()
cluster.close()

for e in events.values():
    e.wait()

However, the new problem is that job may have finished before 'main' assigned id (in this case, job.id is initially set to None), so callback function can not access the corresponding Event object.

My final solution is to pass the job id as an argument of my computation, and print it to stdout, and then extract it in callback from job.stdout, e.g.,

def compute(job_id):
    print("###job_id=%s###" % job_id)
    # computation

def job_callback(job):
    if not job.id:
        import re
        match = re.search("###job_id=([\w.]+)###", job.stdout)
        if not match:
            dispy.logger.info("find job id failed...")
            return
        job.id = match.group(1) 

Let me know if you have any suggestions.