inveniosoftware-contrib / workflow

Simple Pythonic Workflows
https://workflow.readthedocs.io
Other
152 stars 46 forks source link

SYNCHRONIZE dead loop #43

Open LinJianping opened 7 years ago

LinJianping commented 7 years ago

I have written a simple test as follow, but it seems the demo would fall into dead loop. Can anyone tell me why? Thanks.

 from workflow.engine import GenericWorkflowEngine
 from workflow.patterns import SYNCHRONIZE
 my_engine = GenericWorkflowEngine()
 def fangxingzaobo(obj,eng):
     print 'fangxingzaobo'
 def shixingzaobo(obj,eng):
     print 'shixingzaobo'
 class ECG_data(object):
     def __init__(self,data):
         self.data = data
 my_workflow_definition = [
              SYNCHRONIZE(shixingzaobo,fangxingzaobo)
              ]
 ecg_data = ECG_data(1)
 my_engine.callbacks.replace(my_workflow_definition)
 my_engine.process([ecg_data])
poinot commented 7 years ago

Fixed:

  1. a lambda in a closure is evaluated at run time. the side effect in workflow/pattern/controlflow.py SYNCRONIZE function is that you only have the last function of the list for all Queue entries. Fixed by forcing a keyword arg with default value in the lambda so that the closure actually as the right function
  2. the join requires a .task_done to complete the threaded call.

patch below (by the way line numbers are for v2.0.1) in workflow/patterns/controlflow.py:

--- ./workflow/patterns/controlflow.py  2017-08-16 15:08:58.917214062 +0200
+++ ../workflow-2.0.1/workflow/patterns/controlflow.py  2017-08-04 19:05:24.000000000 +0200
@@ -383,13 +383,14 @@
             t = MySpecialThread(queue)
             t.setDaemon(True)
             t.start()
+
         for func in args[0:-1]:
             if isinstance(func, list) or isinstance(func, tuple):
                 new_eng = eng.duplicate()
                 new_eng.setWorkflow(func)
                 queue.put(lambda: new_eng.process([obj]))
             else:
-                queue.put(lambda func=func : func(obj, eng))
+                queue.put(lambda: func(obj, eng))

         # wait on the queue until everything has been processed
         queue.join_with_timeout(timeout)
@@ -476,6 +477,7 @@
         finally:
             self.all_tasks_done.release()

+
 class MySpecialThread(threading.Thread):

     def __init__(self, itemq, *args, **kwargs):
@@ -485,4 +487,3 @@
     def run(self):
         call = self.itemq.get()
         call()
-        self.itemq.task_done()