0101 / pipetools

Functional plumbing for Python
https://0101.github.io/pipetools/
MIT License
207 stars 17 forks source link

Does pipetools support async functions? #9

Open nicksspirit opened 4 years ago

0101 commented 4 years ago

Right now it doesn't do anything special for async functions. I suppose you'd want something like:

result = some_input > async_func1 | async_func2 | regular_func 

To produce the equivalent of:

async def f():
   return regular_func(await async_func2(await async_func1(some_input)))
result = f()

Or do you have some other use cases?

The above should probably be doable without breaking any existing functionality.

nicksspirit commented 4 years ago

What you have there is a great start, I would love to help you implement it but I will probably need guidance.

Do async functions have a dunder property to check if it's async?

On Thu, Dec 12, 2019, 12:07 PM 0101 notifications@github.com wrote:

Right now it doesn't do anything special for async functions. I suppose you'd want something like:

result = some_input > async_func1 | async_func2 | regular_func

To produce the equivalent of:

async def f(): return regular_func(await async_func2(await async_func1(some_input))) result = f()

Or do you have some other use cases?

The above should probably be doable without breaking any existing functionality.

— You are receiving this because you authored the thread. Reply to this email directly, view it on GitHub https://github.com/0101/pipetools/issues/9?email_source=notifications&email_token=ADMY5F6FCBYTAOOZE2LSUADQYJVWLA5CNFSM4JZTXBVKYY3PNVWWK3TUL52HS4DFVREXG43VMVBW63LNMVXHJKTDN5WW2ZLOORPWSZGOEGXLD4I#issuecomment-565096945, or unsubscribe https://github.com/notifications/unsubscribe-auth/ADMY5F6DKST4UCOSAUHDIKTQYJVWLANCNFSM4JZTXBVA .

0101 commented 4 years ago

I don't really have much experience with Python's asyncio. From what I quickly tried, it seems that it would have to be deduced from the return value, which in case of async function is a coroutine object and it has __await__.

My initial guess is that Pipe.bind or .compose would need to be altered and include the detection and possibly instantiate a new type of AsyncPipe with some overrides.

Also maybe a bit more thinking is required on whether doing this automatically could prevent some legitimate use cases where someone would want to manipulate coroutines without awaiting them. In which case some explicit annotation would be better. (E.g. input > wait_for(async_func) | regular_func) or input > async_pipe | async_func | regular_func or something like that).

It'd be great if you helped with implementation - especially if you have some real-world use cases where the solution could be verified.

leosussan commented 4 years ago

First - really love this project, thank you for making this!

Second - IMO the desired syntax would be something like (if possible):

result = some_input > await async_func1 | await async_func2 | regular_func 
nicksspirit commented 4 years ago

@leosussan I don't think that is possible because you'll have to call the function for it to be awaitable. Awaiting a function returns a coroutine. The tricky thing is that when function in the pipeline is awaited does the entire pipeline pause or only one of the functions.

so

result = some_input > await async_func1() | await async_func2() | regular_func 

I would think the former as piping is a very synchronous operation. Maybe under the hood we would want to take all the functions and then pass it to asyncio.wait asyncio.gather

This is an interesting problem, I have been thinking about this for a while now

nicksspirit commented 4 years ago

Maybe this package paco can provide some insight, in their docs they have this example

import paco

async def filterer(x):
    return x < 8

async def mapper(x):
    return x * 2

async def drop(x):
    return x < 10

async def reducer(acc, x):
    return acc + x

async def task(numbers):
    return await (numbers
                   | paco.filter(filterer)
                   | paco.map(mapper)
                   | paco.dropwhile(drop)
                   | paco.reduce(reducer, initializer=0))

# Run in event loop
number = paco.run(task((1, 2, 3, 4, 5, 6, 7, 8, 9, 10)))
print('Number:', number) # => 36
0101 commented 4 years ago

Using the await keyword when constructing the pipe won't be possible, because arguments for those functions aren't available at that time.

If it isn't possible or desirable to automatically detect if a given function is async, then it would have to be wrapped in some helper function, so you'd get something like:

result = some_input > await_(async_func1) | await_(async_func2) | regular_func 

(And the result still has to be awaited or passed to some event loop to execute)

leosussan commented 4 years ago

Use inspect.iscoroutinefunction(object) to detect if a function is async.

See: https://docs.python.org/3/library/inspect.html#inspect.iscoroutinefunction