inveniosoftware-contrib / workflow

Simple Pythonic Workflows
https://workflow.readthedocs.io
Other
152 stars 46 forks source link

SYNHRONIZE does notreuse engine internals #56

Open poinot opened 7 years ago

poinot commented 7 years ago

The SYNCHRONIZE pattern runs concurrent callbacks using threads. However, the call does not fire the before_each_callback and after_each_callback wrappers.

Proposed changed in workflow/pattern/controlflow.py (_synchronize)

        for func in args[0:-1]:
            if isinstance(func, list) or isinstance(func, tuple):
                new_eng = eng.duplicate()
                new_eng.setWorkflow(func)
                queue.put(lambda: new_eng.process([obj]))
            else:
                def t_func(obj,eng):
                    ecb=eng.processing_factory.action_mapper
                    ecb.before_each_callback(eng,func,obj)
                    eng.execute_callback(func,obj)
                    ecb.after_each_callback(eng,func,obj)
                queue.put(lambda t_func=t_func : t_func(obj, eng))
lucasfcardozo commented 5 years ago

I had the same problem. My solution was to add a task_done in run() method of the MySpecialThread class (workflow/pattern/controlflow.py).

class MySpecialThread(threading.Thread):

    def __init__(self, itemq, *args, **kwargs):
        threading.Thread.__init__(self, *args, **kwargs)
        self.itemq = itemq

    def run(self):
        call = self.itemq.get()
        call()
        self.itemq.task_done()