0101 / pipetools

Functional plumbing for Python
https://0101.github.io/pipetools/
MIT License
203 stars 17 forks source link

tqdm support? #5

Open bt2901 opened 6 years ago

bt2901 commented 6 years ago

Suppose I have a pipe out of multiple complicated functions:

long_computation = (pipe | f1 | f2 | f3 | f4)
long_computation(x)

It would be cool to provide a tqdm support here (togglable with something like long_computation.enable_progressbar()):

I'd like to help implementing this, provided this is possible!

0101 commented 6 years ago

Hey, while that should be possible, I'd like to keep this library simple and this would add a bunch of complexity with regards to the tqdm dependency. And I don't think this is such a common use-case that it would be worth it.

I think instead you should create your own subclass of pipetools.main.Pipe and implement it there. Then use your pipe for this type of operations. It can then also be published as a package for others to use.

Let me know if you need any help with the code!

bt2901 commented 6 years ago

Thanks, that makes sense!

I managed to hack something together, but I found myself confused about global state. You have class Pipe and object named pipe, an instance of it. The most of methods are @classmethods or @staticmethods, but also there's a self.func property. It appears to be a global class attribute, but it doesn't behave like it. What's happening there?

My current solution looks like this:

def get_stack(f):
    from pipetools.main import Pipe
    pipetools_stack = getattr(f, '__pipetools__stack__', [get_name(f)])
    return pipetools_stack

def set_stack(f, first_function, second_function):
    try:
        new_stack = get_stack(first_function) + [get_name(second_function)]
        f.__pipetools__stack__ = new_stack
    except (AttributeError, UnicodeEncodeError):
        pass
    return f

class ProgressPipe(Pipe):

    @classmethod
    def get_pbar(cls):
        return cls.pbar

    @classmethod
    def set_pbar(cls, pbar):
        cls.pbar = pbar

    @staticmethod
    def compose(first, second):
        name = lambda: '{0} | {1}'.format(get_name(first), get_name(second))
        def composite(*args, **kwargs):
            pbar = ProgressPipe.get_pbar()
            pbar.set_postfix(current_computation=get_name(second), refresh=True)
            result = second(first(*args, **kwargs))
            pbar.update(1)
            return result
        return set_name(name, set_stack(composite, first, second))

    def __lt__(self, thing): raise NotImplementedError

    def __call__(self, *args, **kwargs):
        my_stack = get_stack(self.func)
        with tqdm(total=len(my_stack), unit='F') as pbar:
            pbar.write(get_name(self.func))
            self.set_pbar(pbar)
            result = self.func(*args, **kwargs)
        return result

It isn't very pretty, but it gets job done. However, I don't like set_pbar method, as it involves global state. The second problem is that there's no easy way to find out the name of the function currently computed (current_computation=get_name(second) gets it wrong). I could do it with similar @class_method named set_function_stack, but I'm not comfortable with this.

0101 commented 6 years ago

Hey,

So first, there is no global state. Pipe is the class and pipe is just one instance which does nothing. But every piping (|) call creates a new instance. The instances are immutable (well technically they aren't, but shouldn't be mutated). self.func is being set in __init__ so it's a normal instance attribute. If there is a classmethod or staticmethod it's just because they don't need the instance or class to do what they do. However in a subclass this can be changed, as you'll see in my example (bind changed to instance method).

I was looking into it for a while and actually couldn't figure it out with the way the composition works - it will either miss the first update (like in your example) or require some ugliness (like checking if one of the functions is already a composite, which actually wouldn't even be enough in case some of the functions would be a pipes as well).

So to make it work properly I refactored it a bit to where the functions aren't composed immediately but rather collected to a list and then composed when the pipe is executed (well instead of a list I used tuple to be make sure it's immutable).

And actually this solution might be better overall for debugging as it should produce better stack traces. Also there will be the possibility to inspect what's in a pipe after it's created. So eventually I might refactor Pipe like this. For now...


from functools import partial, reduce

from tqdm import tqdm

from pipetools.debug import get_name, set_name, repr_args
from pipetools.main import Pipe, prepare_function_for_pipe

class TqdmPipe(Pipe):

    def __init__(self, funcs=()):
        self.funcs = funcs
        self.__name__ = 'Pipe'

    def __str__(self):
        return " | ".join(map(get_name, self.funcs))

    __repr__ = __str__

    def bind(self, first, second, new_cls=None):
        return (new_cls or type(self))(first + second)

    def __or__(self, next_func):
        pipe_in_a_pipe = isinstance(next_func, Pipe) and next_func.func is None
        new_cls = type(next_func) if pipe_in_a_pipe else None
        next = () if pipe_in_a_pipe else (prepare_function_for_pipe(next_func),)
        return self.bind(self.funcs, next, new_cls)

    def __ror__(self, prev_func):
        return self.bind((prepare_function_for_pipe(prev_func),), self.funcs)

    def __lt__(self, thing):
        return self(thing) if self.funcs else thing

    @staticmethod
    def _step(progress, previous_result, func):
        progress.update()
        progress.set_postfix(current_computation=get_name(func))
        return func(previous_result)

    def __call__(self, *args, **kwargs):
        with tqdm(total=len(self.funcs), unit='F') as progress:
            progress.set_postfix(current_computation=get_name(self.funcs[0]))
            result = reduce(partial(self._step, progress), self.funcs[1:], self.funcs[0](*args, **kwargs))
            progress.update()
        return result

tqdm_pipe = TqdmPipe()