pgiri / dispy

Distributed and Parallel Computing Framework with / for Python
https://dispy.org
Other
266 stars 55 forks source link

DispyJob doesn't change its status on node failure #170

Closed Dementiy closed 5 years ago

Dementiy commented 5 years ago

I launched the scheduler and several nodes. Then one of the nodes fails by any reason. I expected that the jobs scheduled on this node will get status "Terminated" or "Abandoned", but their status hasn't changed ("Running"). How to recognize that jobs are no longer performed?

Simplified example:

import dispy
import threading
import time

def compute():
    import time
    time.sleep(30)

def run(cluster):
    job = cluster.submit()
    while True:
        print(f"Job(id={job.id}, status={job.status}, addr={job.ip_addr})")
        if job.status == dispy.DispyJob.Finished:
            break
        time.sleep(5)

def main():
    cluster = dispy.SharedJobCluster(
        compute,
        ip_addr='127.0.0.1',
        port=0,
        scheduler_node='127.0.0.1',
        loglevel=dispy.logger.DEBUG,
    )

    threads = []
    for _ in range(10):
        t = threading.Thread(target=run, args=(cluster,))
        threads.append(t)
        t.start()

    for t in threads:
        t.join()

    cluster.close()

if __name__ == "__main__":
    main()
...
Job(id=1, status=6, addr=127.0.0.1)
Job(id=2, status=6, addr=127.0.0.1)
...
# Kill dispynode process (jobs still have status "Running")
...
Job(id=1, status=6, addr=127.0.0.1)
Job(id=2, status=6, addr=127.0.0.1)
...
2019-01-22 22:12:21 dispy - Ignoring invalid reply for job 4386689936 from 127.0.0.1
2019-01-22 22:12:23 dispy - Ignoring invalid reply for job 4386689936 from 127.0.0.1
pgiri commented 5 years ago

In your case, you need to start dispyscheduler with --pulse_interval option. Then dispynode will report status every pulse seconds. If scheduler doesn't receive status up to 5*pulse seconds, it will deem that node as zombie and will mark jobs running it as likely dead. Then it will send that status to client.

Dementiy commented 5 years ago

I used this options, neither pulse_interval nor zombie_interval doesn't help:

dispyscheduler.py -i 127.0.0.1 -d --daemon --clean --pulse_interval 2

Client log:

Job(id=1, status=6, addr=...)
Job(id=2, status=6, addr=...)
Job(id=3, status=6, addr=...)
...
# Kill dispynode process
...
2019-01-23 12:23:03 pycos - uncaught exception in !tcp_req/65792304:
Traceback (most recent call last):
  File "D:\py37\lib\site-packages\pycos\__init__.py", line 3665, in _schedule
    retval = task._generator.throw(*exc)
  File "D:\py37\lib\site-packages\dispy\__init__.py", line 1049, in tcp_req
    yield self.job_reply_process(info, len(msg), conn, addr)
  File "D:\py37\lib\site-packages\pycos\__init__.py", line 3667, in _schedule
    retval = task._generator.send(task._value)
  File "D:\py37\lib\site-packages\dispy\__init__.py", line 1841, in job_reply_process
    job.result = deserialize(reply.result)
  File "D:\py37\lib\site-packages\pycos\__init__.py", line 74, in deserialize
    return pickle.loads(pkl)
TypeError: a bytes-like object is required, not 'NoneType'
...
Job(id=1, status=6, addr=...)
Job(id=2, status=6, addr=...)
Job(id=3, status=6, addr=...)
...
2019-01-23 12:24:03 dispy - Ignoring invalid reply for job 71826224 from 127.0.0.1
2019-01-23 12:24:05 dispy - Ignoring invalid reply for job 71826224 from 127.0.0.1

Scheduler log:

...
2019-01-23 12:22:42 dispyscheduler - Running job 71827056 on ... (busy: 4 / 4)
2019-01-23 12:22:42 dispyscheduler - Running job 72089712 on ... (busy: 4 / 4)
2019-01-23 12:22:42 dispyscheduler - Running job 72091568 on ... (busy: 4 / 4)
2019-01-23 12:23:03 dispyscheduler - Node 10.57.46.38 is not responding; removing it (4.0, 1548235372.8564093, 1548235383.6781967)
2019-01-23 12:23:03 dispyscheduler - Terminating job 71826224 scheduled on ...
2019-01-23 12:23:03 dispyscheduler - Terminating job 71827056 scheduled on ...
2019-01-23 12:23:03 dispyscheduler - Terminating job 72089712 scheduled on ...
2019-01-23 12:23:03 dispyscheduler - Terminating job 72091568 scheduled on ...
2019-01-23 12:23:13 dispyscheduler - Could not send reply for job 72091568 to 127.0.0.1:52672; saving it in ...
2019-01-23 12:23:13 dispyscheduler - Could not send reply for job 72089712 to 127.0.0.1:52672; saving it in ...
2019-01-23 12:23:13 dispyscheduler - Could not send reply for job 71827056 to 127.0.0.1:52672; saving it in ...
2019-01-23 12:23:13 dispyscheduler - Could not send reply for job 71826224 to 127.0.0.1:52672; saving it in ...

The same behaviour on both Windows and MacOS.

Dementiy commented 5 years ago

JobReply is created with result = None when node fails: https://github.com/pgiri/dispy/blob/103959b373b33055365d8751a3f048ef1af8b635/py3/dispy/dispyscheduler.py#L1645

When reply.result is deserialized on the client side we get an exception: https://github.com/pgiri/dispy/blob/103959b373b33055365d8751a3f048ef1af8b635/py3/dispy/__init__.py#L1835

The easiest way to fix it:

job.result = deserialize(reply.result) if reply.result else None
pgiri commented 5 years ago

I think it is better / more efficient to serialize result in dispyscheduler. Fix committed (I haven't tested though). Thanks for finding the cause and a proposal to fix!